You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/27 08:53:34 UTC

[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6298: [INLONG-6296][Sort] Split one record to multiple records when the physical data has more records for KafkaLoadNode

yunqingmoswu commented on code in PR #6298:
URL: https://github.com/apache/inlong/pull/6298#discussion_r1006588475


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -172,6 +189,101 @@ public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable L
                 readMetadata(consumedRow, KafkaDynamicSink.WritableMetadata.HEADERS));
     }
 
+    /**
+     * Serialize for list it is used for multiple sink scenes when a record contains mulitple real records.
+     *
+     * @param consumedRow The consumeRow
+     * @param timestamp The timestamp
+     * @return List of ProducerRecord
+     */
+    public List<ProducerRecord<byte[], byte[]>> serializeForList(RowData consumedRow, @Nullable Long timestamp) {
+        if (!multipleSink) {
+            return Collections.singletonList(serialize(consumedRow, timestamp));
+        }
+        List<ProducerRecord<byte[], byte[]>> values = new ArrayList<>();
+        try {
+            JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(consumedRow.getBinary(0));
+            boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
+            if (isDDL) {
+                values.add(new ProducerRecord<>(
+                        jsonDynamicSchemaFormat.parse(rootNode, topicPattern),
+                        extractPartition(consumedRow, null, consumedRow.getBinary(0)),
+                        null,
+                        consumedRow.getBinary(0)));
+                return values;
+            }
+            JsonNode updateBeforeNode = jsonDynamicSchemaFormat.getUpdateBefore(rootNode);
+            JsonNode updateAfterNode = jsonDynamicSchemaFormat.getUpdateAfter(rootNode);
+            boolean splitRequired = (updateAfterNode != null && updateAfterNode.isArray()

Review Comment:
   It is a good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org