You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/27 09:37:52 UTC

[inlong] branch master updated: [INLONG-6296][Sort] Split one record to multiple records when the physical data has more records (#6298)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fddb1d56 [INLONG-6296][Sort] Split one record to multiple records when the physical data has more records (#6298)
8fddb1d56 is described below

commit 8fddb1d56d6d29cb59920b4d4657770cdcec8d85
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Thu Oct 27 17:37:47 2022 +0800

    [INLONG-6296][Sort] Split one record to multiple records when the physical data has more records (#6298)
---
 .../sort/base/format/JsonDynamicSchemaFormat.java  |   2 +-
 .../kafka/DynamicKafkaSerializationSchema.java     | 124 ++++++++++++++++++++-
 .../inlong/sort/kafka/FlinkKafkaProducer.java      |  25 +++--
 3 files changed, 136 insertions(+), 15 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index 2c2da2f52..f49f01c0b 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -94,8 +94,8 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
                     .put(java.sql.Types.BOOLEAN, new BooleanType())
                     .put(java.sql.Types.OTHER, new VarCharType())
                     .build();
+    public final ObjectMapper objectMapper = new ObjectMapper();
     protected final JsonToRowDataConverters rowDataConverters;
-    private final ObjectMapper objectMapper = new ObjectMapper();
 
     protected JsonDynamicSchemaFormat() {
         this.rowDataConverters =
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
index 4a0d5dc8b..68f965f6b 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.kafka;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
 import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -28,6 +29,7 @@ import org.apache.flink.table.formats.raw.RawFormatSerializationSchema;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
 import org.apache.inlong.sort.kafka.KafkaDynamicSink.WritableMetadata;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
@@ -36,6 +38,13 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * A specific {@link KafkaSerializationSchema} for {@link KafkaDynamicSink}.
@@ -71,6 +80,8 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat
      */
     private final int[] metadataPositions;
     private final String sinkMultipleFormat;
+    private boolean multipleSink;
+    private JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
     private int[] partitions;
 
     private int parallelInstanceId;
@@ -126,6 +137,13 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat
         if (partitioner != null) {
             partitioner.open(parallelInstanceId, numParallelInstances);
         }
+        // Only support dynamic topic when the topicPattern is specified
+        //      and the valueSerialization is RawFormatSerializationSchema
+        if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) {
+            multipleSink = true;
+            jsonDynamicSchemaFormat =
+                    (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        }
     }
 
     @Override
@@ -162,7 +180,6 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat
         } else {
             valueSerialized = valueSerialization.serialize(valueRow);
         }
-
         return new ProducerRecord<>(
                 getTargetTopic(consumedRow),
                 extractPartition(consumedRow, keySerialized, valueSerialized),
@@ -172,6 +189,104 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat
                 readMetadata(consumedRow, KafkaDynamicSink.WritableMetadata.HEADERS));
     }
 
+    /**
+     * Serialize for list it is used for multiple sink scenes when a record contains mulitple real records.
+     *
+     * @param consumedRow The consumeRow
+     * @param timestamp The timestamp
+     * @return List of ProducerRecord
+     */
+    public List<ProducerRecord<byte[], byte[]>> serializeForList(RowData consumedRow, @Nullable Long timestamp) {
+        if (!multipleSink) {
+            return Collections.singletonList(serialize(consumedRow, timestamp));
+        }
+        List<ProducerRecord<byte[], byte[]>> values = new ArrayList<>();
+        try {
+            JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(consumedRow.getBinary(0));
+            boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
+            if (isDDL) {
+                values.add(new ProducerRecord<>(
+                        jsonDynamicSchemaFormat.parse(rootNode, topicPattern),
+                        extractPartition(consumedRow, null, consumedRow.getBinary(0)),
+                        null,
+                        consumedRow.getBinary(0)));
+                return values;
+            }
+            JsonNode updateBeforeNode = jsonDynamicSchemaFormat.getUpdateBefore(rootNode);
+            JsonNode updateAfterNode = jsonDynamicSchemaFormat.getUpdateAfter(rootNode);
+            if (!splitRequired(updateBeforeNode, updateAfterNode)) {
+                values.add(new ProducerRecord<>(
+                        jsonDynamicSchemaFormat.parse(rootNode, topicPattern),
+                        extractPartition(consumedRow, null, consumedRow.getBinary(0)),
+                        null, consumedRow.getBinary(0)));
+            } else {
+                split2JsonArray(rootNode, updateBeforeNode, updateAfterNode, values);
+            }
+        } catch (IOException e) {
+            LOG.warn("deserialize error", e);
+            values.add(new ProducerRecord<>(topic, null, null, consumedRow.getBinary(0)));
+        }
+        return values;
+    }
+
+    private boolean splitRequired(JsonNode updateBeforeNode, JsonNode updateAfterNode) {
+        return (updateAfterNode != null && updateAfterNode.isArray()
+                && updateAfterNode.size() > 1) || (updateBeforeNode != null && updateBeforeNode.isArray()
+                && updateBeforeNode.size() > 1);
+    }
+
+    private void split2JsonArray(JsonNode rootNode,
+            JsonNode updateBeforeNode, JsonNode updateAfterNode, List<ProducerRecord<byte[], byte[]>> values) {
+        Iterator<Entry<String, JsonNode>> iterator = rootNode.fields();
+        Map<String, Object> baseMap = new LinkedHashMap<>();
+        String updateBeforeKey = null;
+        String updateAfterKey = null;
+        while (iterator.hasNext()) {
+            Entry<String, JsonNode> kv = iterator.next();
+            if (kv.getValue() == null || (!kv.getValue().equals(updateBeforeNode) && !kv.getValue()
+                    .equals(updateAfterNode))) {
+                baseMap.put(kv.getKey(), kv.getValue());
+                continue;
+            }
+            if (kv.getValue().equals(updateAfterNode)) {
+                updateAfterKey = kv.getKey();
+            } else if (kv.getValue().equals(updateBeforeNode)) {
+                updateBeforeKey = kv.getKey();
+            }
+        }
+        if (updateAfterNode != null) {
+            for (int i = 0; i < updateAfterNode.size(); i++) {
+                baseMap.put(updateAfterKey, Collections.singletonList(updateAfterNode.get(i)));
+                if (updateBeforeNode != null && updateBeforeNode.size() > i) {
+                    baseMap.put(updateBeforeKey, Collections.singletonList(updateBeforeNode.get(i)));
+                } else if (updateBeforeKey != null) {
+                    baseMap.remove(updateBeforeKey);
+                }
+                try {
+                    byte[] data = jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap);
+                    values.add(new ProducerRecord<>(
+                            jsonDynamicSchemaFormat.parse(rootNode, topicPattern),
+                            extractPartition(null, null, data), null, data));
+                } catch (Exception e) {
+                    throw new RuntimeException("serialize for list error", e);
+                }
+            }
+        } else {
+            // In general, it will not run to this branch
+            for (int i = 0; i < updateBeforeNode.size(); i++) {
+                baseMap.put(updateBeforeKey, Collections.singletonList(updateBeforeNode.get(i)));
+                try {
+                    byte[] data = jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap);
+                    values.add(new ProducerRecord<>(
+                            jsonDynamicSchemaFormat.parse(rootNode, topicPattern),
+                            extractPartition(null, null, data), null, data));
+                } catch (Exception e) {
+                    throw new RuntimeException("serialize for list error", e);
+                }
+            }
+        }
+    }
+
     @Override
     public void setParallelInstanceId(int parallelInstanceId) {
         this.parallelInstanceId = parallelInstanceId;
@@ -189,14 +304,11 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat
 
     @Override
     public String getTargetTopic(RowData element) {
-        // Only support dynamic topic when the topicPattern is specified
-        //      and the valueSerialization is RawFormatSerializationSchema
-        if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) {
+        if (multipleSink) {
             try {
                 //  Extract the index '0' as the raw data is determined by the Raw format:
                 //  The Raw format allows to read and write raw (byte based) values as a single column
-                return DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat)
-                        .parse(element.getBinary(0), topicPattern);
+                return jsonDynamicSchemaFormat.parse(element.getBinary(0), topicPattern);
             } catch (IOException e) {
                 // Ignore the parse error and it will return the default topic final.
                 LOG.warn("parse dynamic topic error", e);
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index d3ca629cb..f59ff93da 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -54,6 +54,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetric
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TemporaryClassLoaderContext;
@@ -94,7 +95,6 @@ import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
@@ -934,7 +934,8 @@ public class FlinkKafkaProducer<IN>
             throws FlinkKafkaException {
         checkErroneous();
 
-        ProducerRecord<byte[], byte[]> record;
+        ProducerRecord<byte[], byte[]> record = null;
+        List<ProducerRecord<byte[], byte[]>> records = null;
         if (keyedSchema != null) {
             byte[] serializedKey = keyedSchema.serializeKey(next);
             byte[] serializedValue = keyedSchema.serializeValue(next);
@@ -967,9 +968,7 @@ public class FlinkKafkaProducer<IN>
                                 serializedKey,
                                 serializedValue);
             } else {
-                record =
-                        new ProducerRecord<>(
-                                targetTopic, null, timestamp, serializedKey, serializedValue);
+                record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
             }
         } else if (kafkaSchema != null) {
             if (kafkaSchema instanceof KafkaContextAware) {
@@ -986,18 +985,28 @@ public class FlinkKafkaProducer<IN>
                     partitions = getPartitionsByTopic(targetTopic, transaction.producer);
                     topicPartitionsMap.put(targetTopic, partitions);
                 }
-
                 contextAwareSchema.setPartitions(partitions);
             }
-            record = kafkaSchema.serialize(next, context.timestamp());
+            if (kafkaSchema instanceof DynamicKafkaSerializationSchema) {
+                records = ((DynamicKafkaSerializationSchema) kafkaSchema)
+                        .serializeForList((RowData) next, context.timestamp());
+            } else {
+                record = kafkaSchema.serialize(next, context.timestamp());
+            }
         } else {
             throw new RuntimeException(
                     "We have neither KafkaSerializationSchema nor KeyedSerializationSchema, this"
                             + "is a bug.");
         }
+        if (record != null) {
+            send(record, transaction);
+        } else if (records != null) {
+            records.forEach(r -> send(r, transaction));
+        }
+    }
 
+    private void send(ProducerRecord<byte[], byte[]> record, FlinkKafkaProducer.KafkaTransactionState transaction) {
         sendOutMetrics(1L, (long) record.value().length);
-
         pendingRecords.incrementAndGet();
         transaction.producer.send(record, callback);
     }