You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/03 10:11:27 UTC

[inlong] branch master updated: [INLONG-7767][Sort] Doris connector does not real delete record because of columns header losing (#7768)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ca71e9f8a [INLONG-7767][Sort] Doris connector does not real delete record because of columns header losing (#7768)
ca71e9f8a is described below

commit ca71e9f8a20fdedd721c6d49ceea8fcba605a250
Author: Liao Rui <li...@users.noreply.github.com>
AuthorDate: Mon Apr 3 18:11:21 2023 +0800

    [INLONG-7767][Sort] Doris connector does not real delete record because of columns header losing (#7768)
---
 .../inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java   | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index c056a92ee..db446f0ce 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -688,7 +688,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         try {
             // support csv and json format
             String format = executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY, FORMAT_JSON_VALUE);
-            loadValue = serialize(values, format);
+            loadValue = serialize(tableIdentifier, values, format);
             respContent = load(tableIdentifier, loadValue);
             try {
                 if (null != metricData && null != respContent) {
@@ -785,7 +785,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
      * @return string
      * @throws JsonProcessingException
      */
-    private String serialize(List values, String format) throws JsonProcessingException {
+    private String serialize(String tableIdentifier, List values, String format) throws JsonProcessingException {
         if (FORMAT_CSV_VALUE.equalsIgnoreCase(format)) {
             LOG.info("doris data format: {}", format);
             // set columns, and format json data to csv
@@ -822,6 +822,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             }
             return csvData.toString();
         } else {
+            // Dynamic set COLUMNS_KEY for tableIdentifier every time for multiple sink scenario
+            if (multipleSink) {
+                executionOptions.getStreamLoadProp().put(COLUMNS_KEY, columnsMap.get(tableIdentifier));
+            }
             return OBJECT_MAPPER.writeValueAsString(values);
         }
     }