You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2023/01/29 05:03:54 UTC

[inlong] branch master updated: [INLONG-7271][Manager] Support comma separation for primary key and partition key of Hudi table (#7272)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new afaff30d9 [INLONG-7271][Manager] Support comma separation for primary key and partition key of Hudi table (#7272)
afaff30d9 is described below

commit afaff30d9fec593f07bbb9edc83bf688475f596a
Author: feat <fe...@outlook.com>
AuthorDate: Sun Jan 29 13:03:49 2023 +0800

    [INLONG-7271][Manager] Support comma separation for primary key and partition key of Hudi table (#7272)
---
 .../manager/service/sink/hudi/HudiSinkOperator.java   | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
index e5613d4c9..6b3ff7d9c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
@@ -18,10 +18,15 @@
 package org.apache.inlong.manager.service.sink.hudi;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
+
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.FieldType;
@@ -77,20 +82,22 @@ public class HudiSinkOperator extends AbstractSinkOperator {
 
         String partitionKey = sinkRequest.getPartitionKey();
         String primaryKey = sinkRequest.getPrimaryKey();
-        boolean primaryKeyExist = StringUtils.isNotEmpty(partitionKey);
-        boolean partitionKeyExist = StringUtils.isNotEmpty(primaryKey);
+        boolean primaryKeyExist = StringUtils.isNotBlank(primaryKey);
+        boolean partitionKeyExist = StringUtils.isNotBlank(partitionKey);
         if (primaryKeyExist || partitionKeyExist) {
             Set<String> fieldNames = sinkRequest.getSinkFieldList().stream().map(SinkField::getFieldName)
                     .collect(Collectors.toSet());
-            if (primaryKeyExist) {
-                if (!fieldNames.contains(partitionKey)) {
+            if (partitionKeyExist) {
+                List<String> partitionKeys = Arrays.asList(partitionKey.split(InlongConstants.COMMA));
+                if (!CollectionUtils.isSubCollection(partitionKeys, fieldNames)) {
                     throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
                             String.format("The partitionKey(%s) must be included in the sinkFieldList(%s)",
                                     partitionKey, fieldNames));
                 }
             }
-            if (partitionKeyExist) {
-                if (!fieldNames.contains(primaryKey)) {
+            if (primaryKeyExist) {
+                List<String> primaryKeys = Arrays.asList(primaryKey.split(InlongConstants.COMMA));
+                if (!CollectionUtils.isSubCollection(primaryKeys, fieldNames)) {
                     throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
                             String.format("The primaryKey(%s) must be included in the sinkFieldList(%s)",
                                     primaryKey, fieldNames));