You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/11/24 02:50:18 UTC

[GitHub] [inlong] e-mhui commented on a diff in pull request #6541: [INLONG-6495][Sort] Support metrics and restore metrics for single-table of Doris

e-mhui commented on code in PR #6541:
URL: https://github.com/apache/inlong/pull/6541#discussion_r1031010249


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -144,8 +194,49 @@ public static DorisDynamicSchemaOutputFormat.Builder builder() {
         return new DorisDynamicSchemaOutputFormat.Builder();
     }
 
+    private String parseKeysType() {
+        try {
+            Schema schema = RestService.getSchema(options, readOptions, LOG);
+            return schema.getKeysType();
+        } catch (DorisException e) {
+            throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier());
+        }
+    }
+
+    private void handleStreamLoadProp() {
+        Properties props = executionOptions.getStreamLoadProp();
+        boolean ifEscape = Boolean.parseBoolean(props.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT));
+        this.fieldDelimiter = props.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT);
+        this.lineDelimiter = props.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
+        if (ifEscape) {
+            this.fieldDelimiter = DorisParseUtils.escapeString(fieldDelimiter);
+            this.lineDelimiter = DorisParseUtils.escapeString(lineDelimiter);
+            props.remove(ESCAPE_DELIMITERS_KEY);
+        }
+
+        //add column key when fieldNames is not empty
+        if (!props.containsKey(COLUMNS_KEY) && fieldNames != null && fieldNames.length > 0) {
+            String columns = String.join(",", Arrays.stream(fieldNames)
+                    .map(item -> String.format("`%s`", item.trim().replace("`", "")))
+                    .collect(Collectors.toList()));
+            props.put(COLUMNS_KEY, columns);
+        }
+
+        // if enable batch delete, the columns must add tag '__DORIS_DELETE_SIGN__'
+        String columns = (String) props.get(COLUMNS_KEY);
+        if (!columns.contains(DORIS_DELETE_SIGN)

Review Comment:
   Write it in one line here.



##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -159,8 +257,28 @@ public void open(int taskNumber, int numTasks) throws IOException {
                 options.getUsername(),
                 options.getPassword(),
                 executionOptions.getStreamLoadProp());
-        jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat)
-                DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat);
+        if (!multipleSink) {
+            Properties loadProperties = executionOptions.getStreamLoadProp();

Review Comment:
   It will be better to place the columns processing code the `handleStreamloadProp()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org