You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/05/17 05:03:45 UTC

[ignite-extensions] branch IGNITE-16757 created (now e487a1c)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a change to branch IGNITE-16757
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


      at e487a1c  IGNITE-16871 Code review fixes.

This branch includes the following new commits:

     new e487a1c  IGNITE-16871 Code review fixes.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite-extensions] 01/01: IGNITE-16871 Code review fixes.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch IGNITE-16757
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git

commit e487a1c8d1b828ca6b97bb0dc45969c28d39e9ea
Author: Nikolay Izhikov <ni...@apache.org>
AuthorDate: Tue May 17 08:03:37 2022 +0300

    IGNITE-16871 Code review fixes.
---
 .../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 69 ++++++++++++----------
 .../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java |  3 +-
 2 files changed, 39 insertions(+), 33 deletions(-)

diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index fee991b..d821e2e 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -184,61 +184,66 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
             return true;
         });
 
-        while (filtered.hasNext()) {
-            sendLimited(
-                filtered,
-                evt -> new ProducerRecord<>(
-                    evtTopic,
-                    evt.partition() % kafkaParts,
-                    evt.cacheId(),
-                    IgniteUtils.toBytes(evt)
-                ),
-                evtsCnt
-            );
-        }
+        sendAll(
+            filtered,
+            evt -> new ProducerRecord<>(
+                evtTopic,
+                evt.partition() % kafkaParts,
+                evt.cacheId(),
+                IgniteUtils.toBytes(evt)
+            ),
+            evtsCnt
+        );
 
         return true;
     }
 
     /** {@inheritDoc} */
     @Override public void onTypes(Iterator<BinaryType> types) {
-        while (types.hasNext()) {
-            sendLimited(
-                types,
-                t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())),
-                typesCnt
-            );
-        }
+        sendAll(
+            types,
+            t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())),
+            typesCnt
+        );
 
         sendMetaUpdatedMarkers();
     }
 
     /** {@inheritDoc} */
     @Override public void onMappings(Iterator<TypeMapping> mappings) {
-        while (mappings.hasNext()) {
-            sendLimited(
-                mappings,
-                m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)),
-                mappingsCnt
-            );
-        }
+        sendAll(
+            mappings,
+            m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)),
+            mappingsCnt
+        );
 
         sendMetaUpdatedMarkers();
     }
 
     /** Send marker(meta need to be updated) record to each partition of events topic. */
     private void sendMetaUpdatedMarkers() {
-        Iterator<Integer> parts = IntStream.range(0, kafkaParts).iterator();
-
-        while (parts.hasNext())
-            sendLimited(parts, p -> new ProducerRecord<>(evtTopic, p, null, META_UPDATE_MARKER), evtsCnt);
+        sendAll(
+            IntStream.range(0, kafkaParts).iterator(),
+            p -> new ProducerRecord<>(evtTopic, p, null, META_UPDATE_MARKER),
+            evtsCnt
+        );
 
         if (log.isDebugEnabled())
             log.debug("Meta update markers sent.");
     }
 
-    /** Send limited amount of data to Kafka. */
-    private <T> void sendLimited(
+    /** Send all data to Kafka. */
+    private <T> void sendAll(
+        Iterator<T> data,
+        Function<T, ProducerRecord<Integer, byte[]>> toRec,
+        AtomicLongMetric cntr
+    ) {
+        while (data.hasNext())
+            sendOneBatch(data, toRec, cntr);
+    }
+
+    /** Send one batch. */
+    private <T> void sendOneBatch(
         Iterator<T> data,
         Function<T, ProducerRecord<Integer, byte[]>> toRec,
         AtomicLongMetric cntr
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index 3ed2650..acb2aeb 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -236,7 +237,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
     private boolean filterAndPossiblyUpdateMetadata(ConsumerRecord<Integer, byte[]> rec) {
         byte[] val = rec.value();
 
-        if (val.length == META_UPDATE_MARKER.length && U.bytesEqual(val, 0, META_UPDATE_MARKER, 0, val.length)) {
+        if (rec.key() == null && Arrays.equals(val, META_UPDATE_MARKER)) {
             metaUpdr.updateMetadata();
 
             return false;