You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/03 10:57:12 UTC

[inlong] 03/03: [INLONG-7100][Manager][Sort][Dashboard] Support partition key in Hudi sink (#7101)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit ee17285f03280850e1b3511ba4374e6fc1a0f488
Author: feat <fe...@outlook.com>
AuthorDate: Tue Jan 3 16:47:08 2023 +0800

    [INLONG-7100][Manager][Sort][Dashboard] Support partition key in Hudi sink (#7101)
---
 inlong-dashboard/src/locales/cn.json               |  4 +-
 inlong-dashboard/src/locales/en.json               |  4 +-
 inlong-dashboard/src/metas/sinks/defaults/Hudi.ts  | 56 +++++-----------------
 .../inlong/manager/pojo/sink/hudi/HudiSink.java    |  2 +-
 .../inlong/manager/pojo/sink/hudi/HudiSinkDTO.java | 16 ++-----
 .../manager/pojo/sink/hudi/HudiSinkRequest.java    |  2 +-
 .../manager/pojo/sink/hudi/HudiTableInfo.java      |  2 +
 .../manager/pojo/sort/util/LoadNodeUtils.java      | 11 +----
 .../resource/sink/hudi/HudiCatalogClient.java      |  1 -
 .../service/sink/hudi/HudiSinkOperator.java        | 28 ++++++++++-
 .../sort/protocol/node/load/HudiLoadNode.java      | 15 ++----
 .../sort/protocol/node/load/HudiLoadNodeTest.java  |  2 +-
 .../inlong/sort/parser/HudiNodeSqlParserTest.java  |  5 +-
 13 files changed, 59 insertions(+), 89 deletions(-)

diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index d17d4a1ef..b778fb42d 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -204,9 +204,9 @@
   "meta.Sinks.Hudi.FieldType": "字段类型",
   "meta.Sinks.Hudi.FieldDescription": "字段描述",
   "meta.Sinks.Hudi.PrimaryKey": "主键",
-  "meta.Sinks.Hudi.PartitionFieldList": "分区字段",
+  "meta.Sinks.Hudi.PartitionKey": "分区字段",
   "meta.Sinks.Hudi.PrimaryKeyHelper": "主键字段,以逗号(,)分割",
-  "meta.Sinks.Hudi.PartitionFieldListHelp": "字段类型若为timestamp,则必须设置此字段值的格式,支持 MICROSECONDS,MILLISECONDS,SECONDS,SQL,ISO_8601,以及自定义,比如:yyyy-MM-dd HH:mm:ss 等",
+  "meta.Sinks.Hudi.PartitionKeyHelp": "分区字段列表,以英文逗号(,)分隔.",
   "meta.Sinks.Hudi.FieldFormat": "字段格式",
   "meta.Sinks.Hudi.ExtListHelper": "hudi表的DDL属性需带前缀'ddl.'",
   "meta.Sinks.Greenplum.TableName": "表名称",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index 8ba00e92b..b44a94ff5 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -205,8 +205,8 @@
   "meta.Sinks.Hudi.FieldDescription": "FieldDescription",
   "meta.Sinks.Hudi.PrimaryKey": "PrimaryKey",
   "meta.Sinks.Hudi.PrimaryKeyHelper": "The Primary key fields, separated by commas (,)",
-  "meta.Sinks.Hudi.PartitionFieldList": "PartitionFieldList",
-  "meta.Sinks.Hudi.PartitionFieldListHelp": "If the field type is timestamp, you must set the format of the field value, support MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601, and custom, such as: yyyy-MM-dd HH:mm:ss, etc.",
+  "meta.Sinks.Hudi.PartitionKey": "PartitionKey",
+  "meta.Sinks.Hudi.PartitionKeyHelp": "A list of partition fields, separated by commas.",
   "meta.Sinks.Hudi.FieldFormat": "FieldFormat",
   "meta.Sinks.Hudi.ExtListHelper": "The DDL attribute of the hudi table needs to be prefixed with 'ddl.'",
   "meta.Sinks.Greenplum.TableName": "TableName",
diff --git a/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts b/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts
index ad4648ac2..15f173a18 100644
--- a/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts
+++ b/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts
@@ -262,50 +262,6 @@ export default class HudiSink extends SinkInfo implements DataWithBackend, Rende
   })
   sinkFieldList: Record<string, unknown>[];
 
-  @FieldDecorator({
-    type: EditableTable,
-    tooltip: i18n.t('meta.Sinks.Hudi.PartitionFieldListHelp'),
-    col: 24,
-    props: {
-      size: 'small',
-      required: false,
-      columns: [
-        {
-          title: i18n.t('meta.Sinks.Hudi.FieldName'),
-          dataIndex: 'fieldName',
-          rules: [{ required: true }],
-        },
-        {
-          title: i18n.t('meta.Sinks.Hudi.FieldType'),
-          dataIndex: 'fieldType',
-          type: 'select',
-          initialValue: 'string',
-          props: {
-            options: ['string', 'timestamp'].map(item => ({
-              label: item,
-              value: item,
-            })),
-          },
-        },
-        {
-          title: i18n.t('meta.Sinks.Hudi.FieldFormat'),
-          dataIndex: 'fieldFormat',
-          type: 'autocomplete',
-          props: {
-            options: ['MICROSECONDS', 'MILLISECONDS', 'SECONDS', 'SQL', 'ISO_8601'].map(item => ({
-              label: item,
-              value: item,
-            })),
-          },
-          rules: [{ required: true }],
-          visible: (text, record) => record.fieldType === 'timestamp',
-        },
-      ],
-    },
-  })
-  @I18n('meta.Sinks.Hudi.PartitionFieldList')
-  partitionFieldList: Record<string, unknown>[];
-
   @FieldDecorator({
     type: 'input',
     tooltip: i18n.t('meta.Sinks.Hudi.PrimaryKeyHelper'),
@@ -317,6 +273,18 @@ export default class HudiSink extends SinkInfo implements DataWithBackend, Rende
   @ColumnDecorator()
   @I18n('meta.Sinks.Hudi.PrimaryKey')
   primaryKey: string;
+
+  @FieldDecorator({
+    type: 'input',
+    tooltip: i18n.t('meta.Sinks.Hudi.PartitionKeyHelper'),
+    rules: [{ required: false }],
+    props: values => ({
+      disabled: [110, 130].includes(values?.status),
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sinks.Hudi.PartitionKey')
+  partitionKey: string;
 }
 
 const getFieldListColumns = sinkValues => {
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
index 54e332f88..b23c1fad5 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
@@ -77,7 +77,7 @@ public class HudiSink extends StreamSink {
     private List<HashMap<String, String>> extList;
 
     @ApiModelProperty("Partition field list")
-    private List<HudiPartitionField> partitionFieldList;
+    private String partitionKey;
 
     public HudiSink() {
         this.setSinkType(SinkType.HUDI);
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
index 616fc56f7..32b214060 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
@@ -26,7 +26,6 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.JsonUtils;
@@ -75,7 +74,7 @@ public class HudiSinkDTO {
     private List<HashMap<String, String>> extList;
 
     @ApiModelProperty("Partition field list")
-    private List<HudiPartitionField> partitionFieldList;
+    private String partitionKey;
 
     /**
      * Get the dto instance from the request
@@ -87,7 +86,7 @@ public class HudiSinkDTO {
                 .dbName(request.getDbName())
                 .tableName(request.getTableName())
                 .dataPath(request.getDataPath())
-                .partitionFieldList(request.getPartitionFieldList())
+                .partitionKey(request.getPartitionKey())
                 .fileFormat(request.getFileFormat())
                 .catalogType(request.getCatalogType())
                 .properties(request.getProperties())
@@ -112,16 +111,7 @@ public class HudiSinkDTO {
         tableInfo.setDbName(hudiInfo.getDbName());
         tableInfo.setTableName(hudiInfo.getTableName());
 
-        // Set partition fields
-        if (CollectionUtils.isNotEmpty(hudiInfo.getPartitionFieldList())) {
-            for (HudiPartitionField field : hudiInfo.getPartitionFieldList()) {
-                HudiColumnInfo columnInfo = new HudiColumnInfo();
-                columnInfo.setName(field.getFieldName());
-                columnInfo.setPartition(true);
-                columnInfo.setType("string");
-                columnList.add(columnInfo);
-            }
-        }
+        tableInfo.setPartitionKey(hudiInfo.getPartitionKey());
         tableInfo.setColumns(columnList);
         tableInfo.setPrimaryKey(hudiInfo.getPrimaryKey());
         tableInfo.setFileFormat(hudiInfo.getFileFormat());
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
index a51d0a153..bd6c1833b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
@@ -63,7 +63,7 @@ public class HudiSinkRequest extends SinkRequest {
     private List<HashMap<String, String>> extList;
 
     @ApiModelProperty("Partition field list")
-    private List<HudiPartitionField> partitionFieldList;
+    private String partitionKey;
 
     @ApiModelProperty("Primary key")
     private String primaryKey;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
index 24e4d7260..324e14ac4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
@@ -35,4 +35,6 @@ public class HudiTableInfo {
     private List<HudiColumnInfo> columns;
 
     private String primaryKey;
+
+    private String partitionKey;
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 240f208ca..d8f47eda3 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -409,14 +409,7 @@ public class LoadNodeUtils {
     public static HudiLoadNode createLoadNode(HudiSink hudiSink, List<FieldInfo> fieldInfos,
             List<FieldRelation> fieldRelations, Map<String, String> properties) {
         HudiConstant.CatalogType catalogType = HudiConstant.CatalogType.forName(hudiSink.getCatalogType());
-        List<FieldInfo> partitionFields = Lists.newArrayList();
-        if (CollectionUtils.isNotEmpty(hudiSink.getPartitionFieldList())) {
-            partitionFields = hudiSink.getPartitionFieldList().stream()
-                    .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hudiSink.getSinkName(),
-                            FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
-                                    partitionField.getFieldFormat())))
-                    .collect(Collectors.toList());
-        }
+
         return new HudiLoadNode(
                 hudiSink.getSinkName(),
                 hudiSink.getSinkName(),
@@ -433,7 +426,7 @@ public class LoadNodeUtils {
                 hudiSink.getCatalogUri(),
                 hudiSink.getWarehouse(),
                 hudiSink.getExtList(),
-                partitionFields);
+                hudiSink.getPartitionKey());
     }
 
     /**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
index 8055114ad..cff7995f7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
@@ -141,7 +141,6 @@ public class HudiCatalogClient {
                 // filter out the metadata columns
                 .filter(s -> !HoodieAvroUtils.isMetadataField(s.getName()))
                 .collect(Collectors.toList());
-        allCols.addAll(hiveTable.getPartitionKeys());
 
         return allCols.stream()
                 .map((FieldSchema s) -> {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
index 953137cc2..734697296 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
@@ -17,9 +17,12 @@
 
 package org.apache.inlong.manager.service.sink.hudi;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -72,7 +75,28 @@ public class HudiSinkOperator extends AbstractSinkOperator {
         Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
                 ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
         HudiSinkRequest sinkRequest = (HudiSinkRequest) request;
-        List<HashMap<String, String>> extList = sinkRequest.getExtList();
+
+        String partitionKey = sinkRequest.getPartitionKey();
+        String primaryKey = sinkRequest.getPrimaryKey();
+        boolean primaryKeyExist = StringUtils.isNotEmpty(partitionKey);
+        boolean partitionKeyExist = StringUtils.isNotEmpty(primaryKey);
+        if (primaryKeyExist || partitionKeyExist) {
+            Set<String> fieldNames = sinkRequest.getSinkFieldList().stream().map(SinkField::getFieldName)
+                    .collect(Collectors.toSet());
+            if (primaryKeyExist) {
+                checkState(
+                        fieldNames.contains(partitionKey),
+                        "The partitionKey({}) must be included in the sinkFieldList({})",
+                        partitionKey, fieldNames);
+            }
+            if (partitionKeyExist) {
+                checkState(
+                        fieldNames.contains(partitionKey),
+                        "The primaryKey({}) must be included in the sinkFieldList({})",
+                        primaryKey,
+                        fieldNames);
+            }
+        }
 
         try {
             HudiSinkDTO dto = HudiSinkDTO.getFromRequest(sinkRequest);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
index b1448a82b..a150d0c40 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import lombok.Data;
@@ -92,8 +91,8 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable
     @JsonProperty("extList")
     private List<HashMap<String, String>> extList;
 
-    @JsonProperty("partitionFields")
-    private List<FieldInfo> partitionFields;
+    @JsonProperty("partitionKey")
+    private String partitionKey;
 
     @JsonCreator
     public HudiLoadNode(
@@ -112,7 +111,7 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable
             @JsonProperty("uri") String uri,
             @JsonProperty("warehouse") String warehouse,
             @JsonProperty("extList") List<HashMap<String, String>> extList,
-            @JsonProperty("partitionFields") List<FieldInfo> partitionFields) {
+            @JsonProperty("partitionKey") String partitionKey) {
         super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
         this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
@@ -121,7 +120,7 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable
         this.uri = uri;
         this.warehouse = warehouse;
         this.extList = extList;
-        this.partitionFields = partitionFields;
+        this.partitionKey = partitionKey;
     }
 
     @Override
@@ -137,11 +136,7 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable
         options.put(HUDI_OPTION_HIVE_SYNC_METASTORE_URIS, uri);
 
         // partition field
-        if (partitionFields != null && !partitionFields.isEmpty()) {
-            String partitionKey =
-                    partitionFields.stream()
-                            .map(FieldInfo::getName)
-                            .collect(Collectors.joining(","));
+        if (StringUtils.isNoneBlank(partitionKey)) {
             options.put(HUDI_OPTION_PARTITION_PATH_FIELD_NAME, partitionKey);
         }
 
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
index c3579177d..a8e64bcb3 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
@@ -51,6 +51,6 @@ public class HudiLoadNodeTest extends SerializeBaseTest<HudiLoadNode> {
                 "thrift://localhost:9083",
                 "hdfs://localhost:9000/user/hudi/warehouse",
                 new ArrayList<>(),
-                new ArrayList<>());
+                "f1");
     }
 }
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
index 8ddd3fd9b..87c820ca6 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import org.apache.commons.compress.utils.Lists;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -106,7 +105,7 @@ public class HudiNodeSqlParserTest extends AbstractTestBase {
                 null,
                 "hdfs://localhost:9000/hudi/warehouse",
                 extList,
-                Lists.newArrayList());
+                "f1");
     }
 
     private HudiLoadNode buildHudiLoadNodeWithHiveCatalog() {
@@ -145,7 +144,7 @@ public class HudiNodeSqlParserTest extends AbstractTestBase {
                 "thrift://localhost:9083",
                 "/hive/warehouse",
                 extList,
-                Lists.newArrayList());
+                "f1");
     }
 
     /**