You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/03 10:57:12 UTC
[inlong] 03/03: [INLONG-7100][Manager][Sort][Dashboard] Support partition key in Hudi sink (#7101)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit ee17285f03280850e1b3511ba4374e6fc1a0f488
Author: feat <fe...@outlook.com>
AuthorDate: Tue Jan 3 16:47:08 2023 +0800
[INLONG-7100][Manager][Sort][Dashboard] Support partition key in Hudi sink (#7101)
---
inlong-dashboard/src/locales/cn.json | 4 +-
inlong-dashboard/src/locales/en.json | 4 +-
inlong-dashboard/src/metas/sinks/defaults/Hudi.ts | 56 +++++-----------------
.../inlong/manager/pojo/sink/hudi/HudiSink.java | 2 +-
.../inlong/manager/pojo/sink/hudi/HudiSinkDTO.java | 16 ++-----
.../manager/pojo/sink/hudi/HudiSinkRequest.java | 2 +-
.../manager/pojo/sink/hudi/HudiTableInfo.java | 2 +
.../manager/pojo/sort/util/LoadNodeUtils.java | 11 +----
.../resource/sink/hudi/HudiCatalogClient.java | 1 -
.../service/sink/hudi/HudiSinkOperator.java | 28 ++++++++++-
.../sort/protocol/node/load/HudiLoadNode.java | 15 ++----
.../sort/protocol/node/load/HudiLoadNodeTest.java | 2 +-
.../inlong/sort/parser/HudiNodeSqlParserTest.java | 5 +-
13 files changed, 59 insertions(+), 89 deletions(-)
diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index d17d4a1ef..b778fb42d 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -204,9 +204,9 @@
"meta.Sinks.Hudi.FieldType": "字段类型",
"meta.Sinks.Hudi.FieldDescription": "字段描述",
"meta.Sinks.Hudi.PrimaryKey": "主键",
- "meta.Sinks.Hudi.PartitionFieldList": "分区字段",
+ "meta.Sinks.Hudi.PartitionKey": "分区字段",
"meta.Sinks.Hudi.PrimaryKeyHelper": "主键字段,以逗号(,)分割",
- "meta.Sinks.Hudi.PartitionFieldListHelp": "字段类型若为timestamp,则必须设置此字段值的格式,支持 MICROSECONDS,MILLISECONDS,SECONDS,SQL,ISO_8601,以及自定义,比如:yyyy-MM-dd HH:mm:ss 等",
+ "meta.Sinks.Hudi.PartitionKeyHelp": "分区字段列表,以英文逗号(,)分隔.",
"meta.Sinks.Hudi.FieldFormat": "字段格式",
"meta.Sinks.Hudi.ExtListHelper": "hudi表的DDL属性需带前缀'ddl.'",
"meta.Sinks.Greenplum.TableName": "表名称",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index 8ba00e92b..b44a94ff5 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -205,8 +205,8 @@
"meta.Sinks.Hudi.FieldDescription": "FieldDescription",
"meta.Sinks.Hudi.PrimaryKey": "PrimaryKey",
"meta.Sinks.Hudi.PrimaryKeyHelper": "The Primary key fields, separated by commas (,)",
- "meta.Sinks.Hudi.PartitionFieldList": "PartitionFieldList",
- "meta.Sinks.Hudi.PartitionFieldListHelp": "If the field type is timestamp, you must set the format of the field value, support MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601, and custom, such as: yyyy-MM-dd HH:mm:ss, etc.",
+ "meta.Sinks.Hudi.PartitionKey": "PartitionKey",
+ "meta.Sinks.Hudi.PartitionKeyHelp": "A list of partition fields, separated by commas.",
"meta.Sinks.Hudi.FieldFormat": "FieldFormat",
"meta.Sinks.Hudi.ExtListHelper": "The DDL attribute of the hudi table needs to be prefixed with 'ddl.'",
"meta.Sinks.Greenplum.TableName": "TableName",
diff --git a/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts b/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts
index ad4648ac2..15f173a18 100644
--- a/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts
+++ b/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts
@@ -262,50 +262,6 @@ export default class HudiSink extends SinkInfo implements DataWithBackend, Rende
})
sinkFieldList: Record<string, unknown>[];
- @FieldDecorator({
- type: EditableTable,
- tooltip: i18n.t('meta.Sinks.Hudi.PartitionFieldListHelp'),
- col: 24,
- props: {
- size: 'small',
- required: false,
- columns: [
- {
- title: i18n.t('meta.Sinks.Hudi.FieldName'),
- dataIndex: 'fieldName',
- rules: [{ required: true }],
- },
- {
- title: i18n.t('meta.Sinks.Hudi.FieldType'),
- dataIndex: 'fieldType',
- type: 'select',
- initialValue: 'string',
- props: {
- options: ['string', 'timestamp'].map(item => ({
- label: item,
- value: item,
- })),
- },
- },
- {
- title: i18n.t('meta.Sinks.Hudi.FieldFormat'),
- dataIndex: 'fieldFormat',
- type: 'autocomplete',
- props: {
- options: ['MICROSECONDS', 'MILLISECONDS', 'SECONDS', 'SQL', 'ISO_8601'].map(item => ({
- label: item,
- value: item,
- })),
- },
- rules: [{ required: true }],
- visible: (text, record) => record.fieldType === 'timestamp',
- },
- ],
- },
- })
- @I18n('meta.Sinks.Hudi.PartitionFieldList')
- partitionFieldList: Record<string, unknown>[];
-
@FieldDecorator({
type: 'input',
tooltip: i18n.t('meta.Sinks.Hudi.PrimaryKeyHelper'),
@@ -317,6 +273,18 @@ export default class HudiSink extends SinkInfo implements DataWithBackend, Rende
@ColumnDecorator()
@I18n('meta.Sinks.Hudi.PrimaryKey')
primaryKey: string;
+
+ @FieldDecorator({
+ type: 'input',
+ tooltip: i18n.t('meta.Sinks.Hudi.PartitionKeyHelper'),
+ rules: [{ required: false }],
+ props: values => ({
+ disabled: [110, 130].includes(values?.status),
+ }),
+ })
+ @ColumnDecorator()
+ @I18n('meta.Sinks.Hudi.PartitionKey')
+ partitionKey: string;
}
const getFieldListColumns = sinkValues => {
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
index 54e332f88..b23c1fad5 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
@@ -77,7 +77,7 @@ public class HudiSink extends StreamSink {
private List<HashMap<String, String>> extList;
@ApiModelProperty("Partition field list")
- private List<HudiPartitionField> partitionFieldList;
+ private String partitionKey;
public HudiSink() {
this.setSinkType(SinkType.HUDI);
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
index 616fc56f7..32b214060 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
@@ -26,7 +26,6 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.JsonUtils;
@@ -75,7 +74,7 @@ public class HudiSinkDTO {
private List<HashMap<String, String>> extList;
@ApiModelProperty("Partition field list")
- private List<HudiPartitionField> partitionFieldList;
+ private String partitionKey;
/**
* Get the dto instance from the request
@@ -87,7 +86,7 @@ public class HudiSinkDTO {
.dbName(request.getDbName())
.tableName(request.getTableName())
.dataPath(request.getDataPath())
- .partitionFieldList(request.getPartitionFieldList())
+ .partitionKey(request.getPartitionKey())
.fileFormat(request.getFileFormat())
.catalogType(request.getCatalogType())
.properties(request.getProperties())
@@ -112,16 +111,7 @@ public class HudiSinkDTO {
tableInfo.setDbName(hudiInfo.getDbName());
tableInfo.setTableName(hudiInfo.getTableName());
- // Set partition fields
- if (CollectionUtils.isNotEmpty(hudiInfo.getPartitionFieldList())) {
- for (HudiPartitionField field : hudiInfo.getPartitionFieldList()) {
- HudiColumnInfo columnInfo = new HudiColumnInfo();
- columnInfo.setName(field.getFieldName());
- columnInfo.setPartition(true);
- columnInfo.setType("string");
- columnList.add(columnInfo);
- }
- }
+ tableInfo.setPartitionKey(hudiInfo.getPartitionKey());
tableInfo.setColumns(columnList);
tableInfo.setPrimaryKey(hudiInfo.getPrimaryKey());
tableInfo.setFileFormat(hudiInfo.getFileFormat());
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
index a51d0a153..bd6c1833b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
@@ -63,7 +63,7 @@ public class HudiSinkRequest extends SinkRequest {
private List<HashMap<String, String>> extList;
@ApiModelProperty("Partition field list")
- private List<HudiPartitionField> partitionFieldList;
+ private String partitionKey;
@ApiModelProperty("Primary key")
private String primaryKey;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
index 24e4d7260..324e14ac4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
@@ -35,4 +35,6 @@ public class HudiTableInfo {
private List<HudiColumnInfo> columns;
private String primaryKey;
+
+ private String partitionKey;
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 240f208ca..d8f47eda3 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -409,14 +409,7 @@ public class LoadNodeUtils {
public static HudiLoadNode createLoadNode(HudiSink hudiSink, List<FieldInfo> fieldInfos,
List<FieldRelation> fieldRelations, Map<String, String> properties) {
HudiConstant.CatalogType catalogType = HudiConstant.CatalogType.forName(hudiSink.getCatalogType());
- List<FieldInfo> partitionFields = Lists.newArrayList();
- if (CollectionUtils.isNotEmpty(hudiSink.getPartitionFieldList())) {
- partitionFields = hudiSink.getPartitionFieldList().stream()
- .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hudiSink.getSinkName(),
- FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
- partitionField.getFieldFormat())))
- .collect(Collectors.toList());
- }
+
return new HudiLoadNode(
hudiSink.getSinkName(),
hudiSink.getSinkName(),
@@ -433,7 +426,7 @@ public class LoadNodeUtils {
hudiSink.getCatalogUri(),
hudiSink.getWarehouse(),
hudiSink.getExtList(),
- partitionFields);
+ hudiSink.getPartitionKey());
}
/**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
index 8055114ad..cff7995f7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
@@ -141,7 +141,6 @@ public class HudiCatalogClient {
// filter out the metadata columns
.filter(s -> !HoodieAvroUtils.isMetadataField(s.getName()))
.collect(Collectors.toList());
- allCols.addAll(hiveTable.getPartitionKeys());
return allCols.stream()
.map((FieldSchema s) -> {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
index 953137cc2..734697296 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
@@ -17,9 +17,12 @@
package org.apache.inlong.manager.service.sink.hudi;
+import static com.google.common.base.Preconditions.checkState;
+
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.HashMap;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -72,7 +75,28 @@ public class HudiSinkOperator extends AbstractSinkOperator {
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
HudiSinkRequest sinkRequest = (HudiSinkRequest) request;
- List<HashMap<String, String>> extList = sinkRequest.getExtList();
+
+ String partitionKey = sinkRequest.getPartitionKey();
+ String primaryKey = sinkRequest.getPrimaryKey();
+ boolean primaryKeyExist = StringUtils.isNotEmpty(partitionKey);
+ boolean partitionKeyExist = StringUtils.isNotEmpty(primaryKey);
+ if (primaryKeyExist || partitionKeyExist) {
+ Set<String> fieldNames = sinkRequest.getSinkFieldList().stream().map(SinkField::getFieldName)
+ .collect(Collectors.toSet());
+ if (primaryKeyExist) {
+ checkState(
+ fieldNames.contains(partitionKey),
+ "The partitionKey({}) must be included in the sinkFieldList({})",
+ partitionKey, fieldNames);
+ }
+ if (partitionKeyExist) {
+ checkState(
+ fieldNames.contains(partitionKey),
+ "The primaryKey({}) must be included in the sinkFieldList({})",
+ primaryKey,
+ fieldNames);
+ }
+ }
try {
HudiSinkDTO dto = HudiSinkDTO.getFromRequest(sinkRequest);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
index b1448a82b..a150d0c40 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Data;
@@ -92,8 +91,8 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable
@JsonProperty("extList")
private List<HashMap<String, String>> extList;
- @JsonProperty("partitionFields")
- private List<FieldInfo> partitionFields;
+ @JsonProperty("partitionKey")
+ private String partitionKey;
@JsonCreator
public HudiLoadNode(
@@ -112,7 +111,7 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable
@JsonProperty("uri") String uri,
@JsonProperty("warehouse") String warehouse,
@JsonProperty("extList") List<HashMap<String, String>> extList,
- @JsonProperty("partitionFields") List<FieldInfo> partitionFields) {
+ @JsonProperty("partitionKey") String partitionKey) {
super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
@@ -121,7 +120,7 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable
this.uri = uri;
this.warehouse = warehouse;
this.extList = extList;
- this.partitionFields = partitionFields;
+ this.partitionKey = partitionKey;
}
@Override
@@ -137,11 +136,7 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable
options.put(HUDI_OPTION_HIVE_SYNC_METASTORE_URIS, uri);
// partition field
- if (partitionFields != null && !partitionFields.isEmpty()) {
- String partitionKey =
- partitionFields.stream()
- .map(FieldInfo::getName)
- .collect(Collectors.joining(","));
+ if (StringUtils.isNoneBlank(partitionKey)) {
options.put(HUDI_OPTION_PARTITION_PATH_FIELD_NAME, partitionKey);
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
index c3579177d..a8e64bcb3 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
@@ -51,6 +51,6 @@ public class HudiLoadNodeTest extends SerializeBaseTest<HudiLoadNode> {
"thrift://localhost:9083",
"hdfs://localhost:9000/user/hudi/warehouse",
new ArrayList<>(),
- new ArrayList<>());
+ "f1");
}
}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
index 8ddd3fd9b..87c820ca6 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.commons.compress.utils.Lists;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -106,7 +105,7 @@ public class HudiNodeSqlParserTest extends AbstractTestBase {
null,
"hdfs://localhost:9000/hudi/warehouse",
extList,
- Lists.newArrayList());
+ "f1");
}
private HudiLoadNode buildHudiLoadNodeWithHiveCatalog() {
@@ -145,7 +144,7 @@ public class HudiNodeSqlParserTest extends AbstractTestBase {
"thrift://localhost:9083",
"/hive/warehouse",
extList,
- Lists.newArrayList());
+ "f1");
}
/**