You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2023/01/29 05:03:54 UTC
[inlong] branch master updated: [INLONG-7271][Manager] Support comma separation for primary key and partition key of Hudi table (#7272)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new afaff30d9 [INLONG-7271][Manager] Support comma separation for primary key and partition key of Hudi table (#7272)
afaff30d9 is described below
commit afaff30d9fec593f07bbb9edc83bf688475f596a
Author: feat <fe...@outlook.com>
AuthorDate: Sun Jan 29 13:03:49 2023 +0800
[INLONG-7271][Manager] Support comma separation for primary key and partition key of Hudi table (#7272)
---
.../manager/service/sink/hudi/HudiSinkOperator.java | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
index e5613d4c9..6b3ff7d9c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
@@ -18,10 +18,15 @@
package org.apache.inlong.manager.service.sink.hudi;
import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.FieldType;
@@ -77,20 +82,22 @@ public class HudiSinkOperator extends AbstractSinkOperator {
String partitionKey = sinkRequest.getPartitionKey();
String primaryKey = sinkRequest.getPrimaryKey();
- boolean primaryKeyExist = StringUtils.isNotEmpty(partitionKey);
- boolean partitionKeyExist = StringUtils.isNotEmpty(primaryKey);
+ boolean primaryKeyExist = StringUtils.isNotBlank(primaryKey);
+ boolean partitionKeyExist = StringUtils.isNotBlank(partitionKey);
if (primaryKeyExist || partitionKeyExist) {
Set<String> fieldNames = sinkRequest.getSinkFieldList().stream().map(SinkField::getFieldName)
.collect(Collectors.toSet());
- if (primaryKeyExist) {
- if (!fieldNames.contains(partitionKey)) {
+ if (partitionKeyExist) {
+ List<String> partitionKeys = Arrays.asList(partitionKey.split(InlongConstants.COMMA));
+ if (!CollectionUtils.isSubCollection(partitionKeys, fieldNames)) {
throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
String.format("The partitionKey(%s) must be included in the sinkFieldList(%s)",
partitionKey, fieldNames));
}
}
- if (partitionKeyExist) {
- if (!fieldNames.contains(primaryKey)) {
+ if (primaryKeyExist) {
+ List<String> primaryKeys = Arrays.asList(primaryKey.split(InlongConstants.COMMA));
+ if (!CollectionUtils.isSubCollection(primaryKeys, fieldNames)) {
throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
String.format("The primaryKey(%s) must be included in the sinkFieldList(%s)",
primaryKey, fieldNames));