You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/03/01 06:24:18 UTC

[inlong] branch master updated: [INLONG-7392][Sort] Refactor Doris single table to solve performance issues (#7439)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 57c04c8f5 [INLONG-7392][Sort] Refactor Doris single table to solve performance issues (#7439)
57c04c8f5 is described below

commit 57c04c8f5f9bb26206bb1be2fbd61a9e9dd307b2
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Wed Mar 1 14:24:13 2023 +0800

    [INLONG-7392][Sort] Refactor Doris single table to solve performance issues (#7439)
---
 .../table/DorisDynamicSchemaOutputFormat.java      | 33 +++++------------
 .../inlong/sort/doris/util/DorisParseUtils.java    | 41 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 25 deletions(-)

diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 84f573334..ebc8cfca2 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -149,6 +149,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
     private final LogicalType[] logicalTypes;
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
+    private transient Schema schema;
 
     public DorisDynamicSchemaOutputFormat(DorisOptions option,
             DorisReadOptions readOptions,
@@ -191,15 +192,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         return new DorisDynamicSchemaOutputFormat.Builder();
     }
 
-    private String parseKeysType() {
-        try {
-            Schema schema = RestService.getSchema(options, readOptions, LOG);
-            return schema.getKeysType();
-        } catch (DorisException e) {
-            throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier());
-        }
-    }
-
     private void handleStreamLoadProp() {
         Properties props = executionOptions.getStreamLoadProp();
         boolean ifEscape = Boolean.parseBoolean(props.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT));
@@ -230,12 +222,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         if (multipleSink) {
             return executionOptions.getEnableDelete();
         }
-        try {
-            Schema schema = RestService.getSchema(options, readOptions, LOG);
-            return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
-        } catch (DorisException e) {
-            throw new RuntimeException("Failed fetch doris single table schema: " + options.getTableIdentifier(), e);
-        }
+        return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
     }
 
     @Override
@@ -251,16 +238,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             handleStreamLoadProp();
             this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
             for (int i = 0; i < logicalTypes.length; i++) {
-                fieldGetters[i] = RowData.createFieldGetter(logicalTypes[i], i);
-                if ("DATE".equalsIgnoreCase(logicalTypes[i].toString())) {
-                    int finalI = i;
-                    fieldGetters[i] = row -> {
-                        if (row.isNullAt(finalI)) {
-                            return null;
-                        }
-                        return DorisParseUtils.epochToDate(row.getInt(finalI));
-                    };
-                }
+                fieldGetters[i] = DorisParseUtils.createFieldGetter(logicalTypes[i], i);
+            }
+            try {
+                schema = RestService.getSchema(options, readOptions, LOG);
+            } catch (DorisException e) {
+                throw new RuntimeException(e);
             }
         }
 
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
index d5cc32169..6428df94b 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.sort.doris.util;
 
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.RowKind;
 
 import java.time.LocalDate;
@@ -49,6 +52,28 @@ public class DorisParseUtils {
         }
     }
 
+    /**
+     * A utility function used to handle special fieldGetters for specific
+     *
+     * @param type the logical type of the field getter array
+     * @param pos the index of the corresponding row
+     * @return the fieldGetter created
+     */
+    public static FieldGetter createFieldGetter(LogicalType type, int pos) {
+        FieldGetter getter;
+        if (type.toString().equalsIgnoreCase(LogicalTypeEnum.DATE.getType())) {
+            getter = row -> {
+                if (row.isNullAt(pos)) {
+                    return null;
+                }
+                return DorisParseUtils.epochToDate(row.getInt(pos));
+            };
+        } else {
+            getter = RowData.createFieldGetter(type, pos);
+        }
+        return getter;
+    }
+
     /**
      * A utility used to parse a string according to the given hexadecimal escape sequence.
      * <p/>
@@ -88,4 +113,20 @@ public class DorisParseUtils {
         throw new IllegalArgumentException(
                 "Convert to LocalDate failed from unexpected value '" + obj + "' of type " + obj.getClass().getName());
     }
+
+    private enum LogicalTypeEnum {
+
+        DATE("DATE");
+
+        private final String logicalType;
+
+        LogicalTypeEnum(String logicalType) {
+            this.logicalType = logicalType;
+        }
+
+        public String getType() {
+            return logicalType;
+        }
+    }
+
 }