You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/05/17 05:03:46 UTC
[ignite-extensions] 01/01: IGNITE-16871 Code review fixes.
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch IGNITE-16757
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
commit e487a1c8d1b828ca6b97bb0dc45969c28d39e9ea
Author: Nikolay Izhikov <ni...@apache.org>
AuthorDate: Tue May 17 08:03:37 2022 +0300
IGNITE-16871 Code review fixes.
---
.../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 69 ++++++++++++----------
.../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java | 3 +-
2 files changed, 39 insertions(+), 33 deletions(-)
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index fee991b..d821e2e 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -184,61 +184,66 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
return true;
});
- while (filtered.hasNext()) {
- sendLimited(
- filtered,
- evt -> new ProducerRecord<>(
- evtTopic,
- evt.partition() % kafkaParts,
- evt.cacheId(),
- IgniteUtils.toBytes(evt)
- ),
- evtsCnt
- );
- }
+ sendAll(
+ filtered,
+ evt -> new ProducerRecord<>(
+ evtTopic,
+ evt.partition() % kafkaParts,
+ evt.cacheId(),
+ IgniteUtils.toBytes(evt)
+ ),
+ evtsCnt
+ );
return true;
}
/** {@inheritDoc} */
@Override public void onTypes(Iterator<BinaryType> types) {
- while (types.hasNext()) {
- sendLimited(
- types,
- t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())),
- typesCnt
- );
- }
+ sendAll(
+ types,
+ t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())),
+ typesCnt
+ );
sendMetaUpdatedMarkers();
}
/** {@inheritDoc} */
@Override public void onMappings(Iterator<TypeMapping> mappings) {
- while (mappings.hasNext()) {
- sendLimited(
- mappings,
- m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)),
- mappingsCnt
- );
- }
+ sendAll(
+ mappings,
+ m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)),
+ mappingsCnt
+ );
sendMetaUpdatedMarkers();
}
/** Send marker(meta need to be updated) record to each partition of events topic. */
private void sendMetaUpdatedMarkers() {
- Iterator<Integer> parts = IntStream.range(0, kafkaParts).iterator();
-
- while (parts.hasNext())
- sendLimited(parts, p -> new ProducerRecord<>(evtTopic, p, null, META_UPDATE_MARKER), evtsCnt);
+ sendAll(
+ IntStream.range(0, kafkaParts).iterator(),
+ p -> new ProducerRecord<>(evtTopic, p, null, META_UPDATE_MARKER),
+ evtsCnt
+ );
if (log.isDebugEnabled())
log.debug("Meta update markers sent.");
}
- /** Send limited amount of data to Kafka. */
- private <T> void sendLimited(
+ /** Send all data to Kafka. */
+ private <T> void sendAll(
+ Iterator<T> data,
+ Function<T, ProducerRecord<Integer, byte[]>> toRec,
+ AtomicLongMetric cntr
+ ) {
+ while (data.hasNext())
+ sendOneBatch(data, toRec, cntr);
+ }
+
+ /** Send one batch. */
+ private <T> void sendOneBatch(
Iterator<T> data,
Function<T, ProducerRecord<Integer, byte[]>> toRec,
AtomicLongMetric cntr
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index 3ed2650..acb2aeb 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -236,7 +237,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
private boolean filterAndPossiblyUpdateMetadata(ConsumerRecord<Integer, byte[]> rec) {
byte[] val = rec.value();
- if (val.length == META_UPDATE_MARKER.length && U.bytesEqual(val, 0, META_UPDATE_MARKER, 0, val.length)) {
+ if (rec.key() == null && Arrays.equals(val, META_UPDATE_MARKER)) {
metaUpdr.updateMetadata();
return false;