You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/03/01 06:24:18 UTC
[inlong] branch master updated: [INLONG-7392][Sort] Refactor Doris single table to solve performance issues (#7439)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 57c04c8f5 [INLONG-7392][Sort] Refactor Doris single table to solve performance issues (#7439)
57c04c8f5 is described below
commit 57c04c8f5f9bb26206bb1be2fbd61a9e9dd307b2
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Wed Mar 1 14:24:13 2023 +0800
[INLONG-7392][Sort] Refactor Doris single table to solve performance issues (#7439)
---
.../table/DorisDynamicSchemaOutputFormat.java | 33 +++++------------
.../inlong/sort/doris/util/DorisParseUtils.java | 41 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 25 deletions(-)
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 84f573334..ebc8cfca2 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -149,6 +149,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
private final LogicalType[] logicalTypes;
private final DirtyOptions dirtyOptions;
private @Nullable final DirtySink<Object> dirtySink;
+ private transient Schema schema;
public DorisDynamicSchemaOutputFormat(DorisOptions option,
DorisReadOptions readOptions,
@@ -191,15 +192,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
return new DorisDynamicSchemaOutputFormat.Builder();
}
- private String parseKeysType() {
- try {
- Schema schema = RestService.getSchema(options, readOptions, LOG);
- return schema.getKeysType();
- } catch (DorisException e) {
- throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier());
- }
- }
-
private void handleStreamLoadProp() {
Properties props = executionOptions.getStreamLoadProp();
boolean ifEscape = Boolean.parseBoolean(props.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT));
@@ -230,12 +222,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
if (multipleSink) {
return executionOptions.getEnableDelete();
}
- try {
- Schema schema = RestService.getSchema(options, readOptions, LOG);
- return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
- } catch (DorisException e) {
- throw new RuntimeException("Failed fetch doris single table schema: " + options.getTableIdentifier(), e);
- }
+ return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
}
@Override
@@ -251,16 +238,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
handleStreamLoadProp();
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
for (int i = 0; i < logicalTypes.length; i++) {
- fieldGetters[i] = RowData.createFieldGetter(logicalTypes[i], i);
- if ("DATE".equalsIgnoreCase(logicalTypes[i].toString())) {
- int finalI = i;
- fieldGetters[i] = row -> {
- if (row.isNullAt(finalI)) {
- return null;
- }
- return DorisParseUtils.epochToDate(row.getInt(finalI));
- };
- }
+ fieldGetters[i] = DorisParseUtils.createFieldGetter(logicalTypes[i], i);
+ }
+ try {
+ schema = RestService.getSchema(options, readOptions, LOG);
+ } catch (DorisException e) {
+ throw new RuntimeException(e);
}
}
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
index d5cc32169..6428df94b 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
@@ -17,6 +17,9 @@
package org.apache.inlong.sort.doris.util;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import java.time.LocalDate;
@@ -49,6 +52,28 @@ public class DorisParseUtils {
}
}
+ /**
+ * A utility function used to handle special fieldGetters for specific
+ *
+ * @param type the logical type of the field getter array
+ * @param pos the index of the corresponding row
+ * @return the fieldGetter created
+ */
+ public static FieldGetter createFieldGetter(LogicalType type, int pos) {
+ FieldGetter getter;
+ if (type.toString().equalsIgnoreCase(LogicalTypeEnum.DATE.getType())) {
+ getter = row -> {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return DorisParseUtils.epochToDate(row.getInt(pos));
+ };
+ } else {
+ getter = RowData.createFieldGetter(type, pos);
+ }
+ return getter;
+ }
+
/**
* A utility used to parse a string according to the given hexadecimal escape sequence.
* <p/>
@@ -88,4 +113,20 @@ public class DorisParseUtils {
throw new IllegalArgumentException(
"Convert to LocalDate failed from unexpected value '" + obj + "' of type " + obj.getClass().getName());
}
+
+ private enum LogicalTypeEnum {
+
+ DATE("DATE");
+
+ private final String logicalType;
+
+ LogicalTypeEnum(String logicalType) {
+ this.logicalType = logicalType;
+ }
+
+ public String getType() {
+ return logicalType;
+ }
+ }
+
}