You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/03 10:57:09 UTC

[inlong] branch branch-1.5 updated (0da9aafd6 -> ee17285f0)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 0da9aafd6 [INLONG-7110][Release] Update changes log for the 1.5.0 version (#7125)
     new 659385a0b [INLONG-7061][Sort] Support table level metrics for Apache Doris connector and add dirty metrics (#7062)
     new b7c5c91e6 [INLONG-7104][Manager] Add database change script of 1.5.0 for InLongManager (#7105)
     new ee17285f0 [INLONG-7100][Manager][Sort][Dashboard] Support partition key in Hudi sink (#7101)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 inlong-dashboard/src/locales/cn.json               |  4 +-
 inlong-dashboard/src/locales/en.json               |  4 +-
 inlong-dashboard/src/metas/sinks/defaults/Hudi.ts  | 56 +++++-----------------
 .../src/main/assemblies/release.xml                |  3 ++
 .../inlong/manager/pojo/sink/hudi/HudiSink.java    |  2 +-
 .../inlong/manager/pojo/sink/hudi/HudiSinkDTO.java | 16 ++-----
 .../manager/pojo/sink/hudi/HudiSinkRequest.java    |  2 +-
 .../manager/pojo/sink/hudi/HudiTableInfo.java      |  2 +
 .../manager/pojo/sort/util/LoadNodeUtils.java      | 11 +----
 .../resource/sink/hudi/HudiCatalogClient.java      |  1 -
 .../service/sink/hudi/HudiSinkOperator.java        | 28 ++++++++++-
 inlong-manager/manager-web/sql/changes-1.5.0.sql   | 40 ++++++++++++++++
 .../sort/protocol/node/load/HudiLoadNode.java      | 15 ++----
 .../sort/protocol/node/load/HudiLoadNodeTest.java  |  2 +-
 .../sort/base/dirty/sink/s3/S3DirtySink.java       |  3 +-
 .../inlong/sort/base/dirty/sink/s3/S3Helper.java   |  2 +-
 .../sort/base/metric/sub/SinkTableMetricData.java  | 28 +++++++++++
 .../table/DorisDynamicSchemaOutputFormat.java      | 48 ++++++++++++++++++-
 .../inlong/sort/parser/HudiNodeSqlParserTest.java  |  5 +-
 19 files changed, 180 insertions(+), 92 deletions(-)
 create mode 100644 inlong-manager/manager-web/sql/changes-1.5.0.sql


[inlong] 02/03: [INLONG-7104][Manager] Add database change script of 1.5.0 for InLongManager (#7105)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit b7c5c91e6d323a9acf531a0afc52cd020d0628dc
Author: feat <fe...@outlook.com>
AuthorDate: Tue Jan 3 16:46:36 2023 +0800

    [INLONG-7104][Manager] Add database change script of 1.5.0 for InLongManager (#7105)
---
 .../src/main/assemblies/release.xml                |  3 ++
 inlong-manager/manager-web/sql/changes-1.5.0.sql   | 40 ++++++++++++++++++++++
 2 files changed, 43 insertions(+)

diff --git a/inlong-distribution/src/main/assemblies/release.xml b/inlong-distribution/src/main/assemblies/release.xml
index 0cef9e1e7..aeb0d376a 100644
--- a/inlong-distribution/src/main/assemblies/release.xml
+++ b/inlong-distribution/src/main/assemblies/release.xml
@@ -182,6 +182,9 @@
         <fileSet>
             <directory>../inlong-manager/manager-web/target/apache-inlong-manager-web-${project.version}-bin/sql</directory>
             <outputDirectory>../docker/docker-compose/sql</outputDirectory>
+            <excludes>
+                <exclude>changes-1.5.0.sql</exclude>
+            </excludes>
         </fileSet>
     </fileSets>
 
diff --git a/inlong-manager/manager-web/sql/changes-1.5.0.sql b/inlong-manager/manager-web/sql/changes-1.5.0.sql
new file mode 100644
index 000000000..4b655bc2c
--- /dev/null
+++ b/inlong-manager/manager-web/sql/changes-1.5.0.sql
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is the SQL change file from version 1.4.0 to the current version 1.5.0.
+-- When upgrading to version 1.5.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module.
+
+SET NAMES utf8mb4;
+SET FOREIGN_KEY_CHECKS = 0;
+
+USE `apache_inlong_manager`;
+
+
+ALTER TABLE `inlong_group`
+    ADD COLUMN `data_report_type` int(4) DEFAULT '0' COMMENT 'Data report type. 0: report to DataProxy and respond when the DataProxy received data. 1: report to DataProxy and respond after DataProxy sends data. 2: report to MQ and respond when the MQ received data';
+
+
+ALTER TABLE `inlong_cluster_node`
+    ADD COLUMN `node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node';
+
+
+ALTER TABLE `inlong_cluster_node`
+    ADD COLUMN `node_tags` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip';
+
+
+ALTER TABLE `stream_source`
+    ADD COLUMN `inlong_cluster_node_tag` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag';
\ No newline at end of file


[inlong] 03/03: [INLONG-7100][Manager][Sort][Dashboard] Support partition key in Hudi sink (#7101)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit ee17285f03280850e1b3511ba4374e6fc1a0f488
Author: feat <fe...@outlook.com>
AuthorDate: Tue Jan 3 16:47:08 2023 +0800

    [INLONG-7100][Manager][Sort][Dashboard] Support partition key in Hudi sink (#7101)
---
 inlong-dashboard/src/locales/cn.json               |  4 +-
 inlong-dashboard/src/locales/en.json               |  4 +-
 inlong-dashboard/src/metas/sinks/defaults/Hudi.ts  | 56 +++++-----------------
 .../inlong/manager/pojo/sink/hudi/HudiSink.java    |  2 +-
 .../inlong/manager/pojo/sink/hudi/HudiSinkDTO.java | 16 ++-----
 .../manager/pojo/sink/hudi/HudiSinkRequest.java    |  2 +-
 .../manager/pojo/sink/hudi/HudiTableInfo.java      |  2 +
 .../manager/pojo/sort/util/LoadNodeUtils.java      | 11 +----
 .../resource/sink/hudi/HudiCatalogClient.java      |  1 -
 .../service/sink/hudi/HudiSinkOperator.java        | 28 ++++++++++-
 .../sort/protocol/node/load/HudiLoadNode.java      | 15 ++----
 .../sort/protocol/node/load/HudiLoadNodeTest.java  |  2 +-
 .../inlong/sort/parser/HudiNodeSqlParserTest.java  |  5 +-
 13 files changed, 59 insertions(+), 89 deletions(-)

diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index d17d4a1ef..b778fb42d 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -204,9 +204,9 @@
   "meta.Sinks.Hudi.FieldType": "字段类型",
   "meta.Sinks.Hudi.FieldDescription": "字段描述",
   "meta.Sinks.Hudi.PrimaryKey": "主键",
-  "meta.Sinks.Hudi.PartitionFieldList": "分区字段",
+  "meta.Sinks.Hudi.PartitionKey": "分区字段",
   "meta.Sinks.Hudi.PrimaryKeyHelper": "主键字段,以逗号(,)分割",
-  "meta.Sinks.Hudi.PartitionFieldListHelp": "字段类型若为timestamp,则必须设置此字段值的格式,支持 MICROSECONDS,MILLISECONDS,SECONDS,SQL,ISO_8601,以及自定义,比如:yyyy-MM-dd HH:mm:ss 等",
+  "meta.Sinks.Hudi.PartitionKeyHelp": "分区字段列表,以英文逗号(,)分隔.",
   "meta.Sinks.Hudi.FieldFormat": "字段格式",
   "meta.Sinks.Hudi.ExtListHelper": "hudi表的DDL属性需带前缀'ddl.'",
   "meta.Sinks.Greenplum.TableName": "表名称",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index 8ba00e92b..b44a94ff5 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -205,8 +205,8 @@
   "meta.Sinks.Hudi.FieldDescription": "FieldDescription",
   "meta.Sinks.Hudi.PrimaryKey": "PrimaryKey",
   "meta.Sinks.Hudi.PrimaryKeyHelper": "The Primary key fields, separated by commas (,)",
-  "meta.Sinks.Hudi.PartitionFieldList": "PartitionFieldList",
-  "meta.Sinks.Hudi.PartitionFieldListHelp": "If the field type is timestamp, you must set the format of the field value, support MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601, and custom, such as: yyyy-MM-dd HH:mm:ss, etc.",
+  "meta.Sinks.Hudi.PartitionKey": "PartitionKey",
+  "meta.Sinks.Hudi.PartitionKeyHelp": "A list of partition fields, separated by commas.",
   "meta.Sinks.Hudi.FieldFormat": "FieldFormat",
   "meta.Sinks.Hudi.ExtListHelper": "The DDL attribute of the hudi table needs to be prefixed with 'ddl.'",
   "meta.Sinks.Greenplum.TableName": "TableName",
diff --git a/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts b/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts
index ad4648ac2..15f173a18 100644
--- a/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts
+++ b/inlong-dashboard/src/metas/sinks/defaults/Hudi.ts
@@ -262,50 +262,6 @@ export default class HudiSink extends SinkInfo implements DataWithBackend, Rende
   })
   sinkFieldList: Record<string, unknown>[];
 
-  @FieldDecorator({
-    type: EditableTable,
-    tooltip: i18n.t('meta.Sinks.Hudi.PartitionFieldListHelp'),
-    col: 24,
-    props: {
-      size: 'small',
-      required: false,
-      columns: [
-        {
-          title: i18n.t('meta.Sinks.Hudi.FieldName'),
-          dataIndex: 'fieldName',
-          rules: [{ required: true }],
-        },
-        {
-          title: i18n.t('meta.Sinks.Hudi.FieldType'),
-          dataIndex: 'fieldType',
-          type: 'select',
-          initialValue: 'string',
-          props: {
-            options: ['string', 'timestamp'].map(item => ({
-              label: item,
-              value: item,
-            })),
-          },
-        },
-        {
-          title: i18n.t('meta.Sinks.Hudi.FieldFormat'),
-          dataIndex: 'fieldFormat',
-          type: 'autocomplete',
-          props: {
-            options: ['MICROSECONDS', 'MILLISECONDS', 'SECONDS', 'SQL', 'ISO_8601'].map(item => ({
-              label: item,
-              value: item,
-            })),
-          },
-          rules: [{ required: true }],
-          visible: (text, record) => record.fieldType === 'timestamp',
-        },
-      ],
-    },
-  })
-  @I18n('meta.Sinks.Hudi.PartitionFieldList')
-  partitionFieldList: Record<string, unknown>[];
-
   @FieldDecorator({
     type: 'input',
     tooltip: i18n.t('meta.Sinks.Hudi.PrimaryKeyHelper'),
@@ -317,6 +273,18 @@ export default class HudiSink extends SinkInfo implements DataWithBackend, Rende
   @ColumnDecorator()
   @I18n('meta.Sinks.Hudi.PrimaryKey')
   primaryKey: string;
+
+  @FieldDecorator({
+    type: 'input',
+    tooltip: i18n.t('meta.Sinks.Hudi.PartitionKeyHelper'),
+    rules: [{ required: false }],
+    props: values => ({
+      disabled: [110, 130].includes(values?.status),
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sinks.Hudi.PartitionKey')
+  partitionKey: string;
 }
 
 const getFieldListColumns = sinkValues => {
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
index 54e332f88..b23c1fad5 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
@@ -77,7 +77,7 @@ public class HudiSink extends StreamSink {
     private List<HashMap<String, String>> extList;
 
     @ApiModelProperty("Partition field list")
-    private List<HudiPartitionField> partitionFieldList;
+    private String partitionKey;
 
     public HudiSink() {
         this.setSinkType(SinkType.HUDI);
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
index 616fc56f7..32b214060 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
@@ -26,7 +26,6 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.JsonUtils;
@@ -75,7 +74,7 @@ public class HudiSinkDTO {
     private List<HashMap<String, String>> extList;
 
     @ApiModelProperty("Partition field list")
-    private List<HudiPartitionField> partitionFieldList;
+    private String partitionKey;
 
     /**
      * Get the dto instance from the request
@@ -87,7 +86,7 @@ public class HudiSinkDTO {
                 .dbName(request.getDbName())
                 .tableName(request.getTableName())
                 .dataPath(request.getDataPath())
-                .partitionFieldList(request.getPartitionFieldList())
+                .partitionKey(request.getPartitionKey())
                 .fileFormat(request.getFileFormat())
                 .catalogType(request.getCatalogType())
                 .properties(request.getProperties())
@@ -112,16 +111,7 @@ public class HudiSinkDTO {
         tableInfo.setDbName(hudiInfo.getDbName());
         tableInfo.setTableName(hudiInfo.getTableName());
 
-        // Set partition fields
-        if (CollectionUtils.isNotEmpty(hudiInfo.getPartitionFieldList())) {
-            for (HudiPartitionField field : hudiInfo.getPartitionFieldList()) {
-                HudiColumnInfo columnInfo = new HudiColumnInfo();
-                columnInfo.setName(field.getFieldName());
-                columnInfo.setPartition(true);
-                columnInfo.setType("string");
-                columnList.add(columnInfo);
-            }
-        }
+        tableInfo.setPartitionKey(hudiInfo.getPartitionKey());
         tableInfo.setColumns(columnList);
         tableInfo.setPrimaryKey(hudiInfo.getPrimaryKey());
         tableInfo.setFileFormat(hudiInfo.getFileFormat());
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
index a51d0a153..bd6c1833b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
@@ -63,7 +63,7 @@ public class HudiSinkRequest extends SinkRequest {
     private List<HashMap<String, String>> extList;
 
     @ApiModelProperty("Partition field list")
-    private List<HudiPartitionField> partitionFieldList;
+    private String partitionKey;
 
     @ApiModelProperty("Primary key")
     private String primaryKey;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
index 24e4d7260..324e14ac4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
@@ -35,4 +35,6 @@ public class HudiTableInfo {
     private List<HudiColumnInfo> columns;
 
     private String primaryKey;
+
+    private String partitionKey;
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 240f208ca..d8f47eda3 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -409,14 +409,7 @@ public class LoadNodeUtils {
     public static HudiLoadNode createLoadNode(HudiSink hudiSink, List<FieldInfo> fieldInfos,
             List<FieldRelation> fieldRelations, Map<String, String> properties) {
         HudiConstant.CatalogType catalogType = HudiConstant.CatalogType.forName(hudiSink.getCatalogType());
-        List<FieldInfo> partitionFields = Lists.newArrayList();
-        if (CollectionUtils.isNotEmpty(hudiSink.getPartitionFieldList())) {
-            partitionFields = hudiSink.getPartitionFieldList().stream()
-                    .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hudiSink.getSinkName(),
-                            FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
-                                    partitionField.getFieldFormat())))
-                    .collect(Collectors.toList());
-        }
+
         return new HudiLoadNode(
                 hudiSink.getSinkName(),
                 hudiSink.getSinkName(),
@@ -433,7 +426,7 @@ public class LoadNodeUtils {
                 hudiSink.getCatalogUri(),
                 hudiSink.getWarehouse(),
                 hudiSink.getExtList(),
-                partitionFields);
+                hudiSink.getPartitionKey());
     }
 
     /**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
index 8055114ad..cff7995f7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
@@ -141,7 +141,6 @@ public class HudiCatalogClient {
                 // filter out the metadata columns
                 .filter(s -> !HoodieAvroUtils.isMetadataField(s.getName()))
                 .collect(Collectors.toList());
-        allCols.addAll(hiveTable.getPartitionKeys());
 
         return allCols.stream()
                 .map((FieldSchema s) -> {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
index 953137cc2..734697296 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
@@ -17,9 +17,12 @@
 
 package org.apache.inlong.manager.service.sink.hudi;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -72,7 +75,28 @@ public class HudiSinkOperator extends AbstractSinkOperator {
         Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
                 ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
         HudiSinkRequest sinkRequest = (HudiSinkRequest) request;
-        List<HashMap<String, String>> extList = sinkRequest.getExtList();
+
+        String partitionKey = sinkRequest.getPartitionKey();
+        String primaryKey = sinkRequest.getPrimaryKey();
+        boolean primaryKeyExist = StringUtils.isNotEmpty(partitionKey);
+        boolean partitionKeyExist = StringUtils.isNotEmpty(primaryKey);
+        if (primaryKeyExist || partitionKeyExist) {
+            Set<String> fieldNames = sinkRequest.getSinkFieldList().stream().map(SinkField::getFieldName)
+                    .collect(Collectors.toSet());
+            if (primaryKeyExist) {
+                checkState(
+                        fieldNames.contains(partitionKey),
+                        "The partitionKey({}) must be included in the sinkFieldList({})",
+                        partitionKey, fieldNames);
+            }
+            if (partitionKeyExist) {
+                checkState(
+                        fieldNames.contains(partitionKey),
+                        "The primaryKey({}) must be included in the sinkFieldList({})",
+                        primaryKey,
+                        fieldNames);
+            }
+        }
 
         try {
             HudiSinkDTO dto = HudiSinkDTO.getFromRequest(sinkRequest);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
index b1448a82b..a150d0c40 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import lombok.Data;
@@ -92,8 +91,8 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable
     @JsonProperty("extList")
     private List<HashMap<String, String>> extList;
 
-    @JsonProperty("partitionFields")
-    private List<FieldInfo> partitionFields;
+    @JsonProperty("partitionKey")
+    private String partitionKey;
 
     @JsonCreator
     public HudiLoadNode(
@@ -112,7 +111,7 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable
             @JsonProperty("uri") String uri,
             @JsonProperty("warehouse") String warehouse,
             @JsonProperty("extList") List<HashMap<String, String>> extList,
-            @JsonProperty("partitionFields") List<FieldInfo> partitionFields) {
+            @JsonProperty("partitionKey") String partitionKey) {
         super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
         this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
@@ -121,7 +120,7 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable
         this.uri = uri;
         this.warehouse = warehouse;
         this.extList = extList;
-        this.partitionFields = partitionFields;
+        this.partitionKey = partitionKey;
     }
 
     @Override
@@ -137,11 +136,7 @@ public class HudiLoadNode extends LoadNode implements InlongMetric, Serializable
         options.put(HUDI_OPTION_HIVE_SYNC_METASTORE_URIS, uri);
 
         // partition field
-        if (partitionFields != null && !partitionFields.isEmpty()) {
-            String partitionKey =
-                    partitionFields.stream()
-                            .map(FieldInfo::getName)
-                            .collect(Collectors.joining(","));
+        if (StringUtils.isNoneBlank(partitionKey)) {
             options.put(HUDI_OPTION_PARTITION_PATH_FIELD_NAME, partitionKey);
         }
 
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
index c3579177d..a8e64bcb3 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
@@ -51,6 +51,6 @@ public class HudiLoadNodeTest extends SerializeBaseTest<HudiLoadNode> {
                 "thrift://localhost:9083",
                 "hdfs://localhost:9000/user/hudi/warehouse",
                 new ArrayList<>(),
-                new ArrayList<>());
+                "f1");
     }
 }
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
index 8ddd3fd9b..87c820ca6 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import org.apache.commons.compress.utils.Lists;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -106,7 +105,7 @@ public class HudiNodeSqlParserTest extends AbstractTestBase {
                 null,
                 "hdfs://localhost:9000/hudi/warehouse",
                 extList,
-                Lists.newArrayList());
+                "f1");
     }
 
     private HudiLoadNode buildHudiLoadNodeWithHiveCatalog() {
@@ -145,7 +144,7 @@ public class HudiNodeSqlParserTest extends AbstractTestBase {
                 "thrift://localhost:9083",
                 "/hive/warehouse",
                 extList,
-                Lists.newArrayList());
+                "f1");
     }
 
     /**


[inlong] 01/03: [INLONG-7061][Sort] Support table level metrics for Apache Doris connector and add dirty metrics (#7062)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 659385a0bdad7b1eaf264273efe24c9731f33a5e
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Tue Jan 3 14:06:22 2023 +0800

    [INLONG-7061][Sort] Support table level metrics for Apache Doris connector and add dirty metrics (#7062)
---
 .../sort/base/dirty/sink/s3/S3DirtySink.java       |  3 +-
 .../inlong/sort/base/dirty/sink/s3/S3Helper.java   |  2 +-
 .../sort/base/metric/sub/SinkTableMetricData.java  | 28 +++++++++++++
 .../table/DorisDynamicSchemaOutputFormat.java      | 48 +++++++++++++++++++++-
 4 files changed, 78 insertions(+), 3 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
index ab8fc9464..b8f1f5f10 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
@@ -22,6 +22,7 @@ import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
@@ -244,7 +245,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
         }
         String content = null;
         try {
-            content = StringUtils.join(values, s3Options.getLineDelimiter());
+            content = StringUtils.join(values, StringEscapeUtils.unescapeJava(s3Options.getLineDelimiter()));
             s3Helper.upload(identifier, content);
             LOGGER.info("Write {} records to s3 of identifier: {}", values.size(), identifier);
             writeOutNum.addAndGet(values.size());
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
index d79b8aecd..f925d76e8 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
@@ -39,7 +39,7 @@ public class S3Helper implements Serializable {
     private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
 
     private static final int SEQUENCE_LENGTH = 4;
-    private static final String ESCAPE_PATTERN = "[\\pP\\p{Punct}\\s]";
+    private static final String ESCAPE_PATTERN = "[,,+=: ;()()。/.;]";
     private static final String FILE_NAME_SUFFIX = ".txt";
     private final Random r = new Random();
     private final AmazonS3 s3Client;
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
index cf5285b13..a5690a5b5 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -196,6 +196,34 @@ public class SinkTableMetricData extends SinkMetricData implements SinkSubMetric
         subSinkMetricData.invoke(rowCount, rowSize);
     }
 
+    /**
+     * output dirty metrics with estimate
+     *
+     * @param database the database name of record
+     * @param schema the schema name of record
+     * @param table the table name of record
+     * @param rowCount the row count of records
+     * @param rowSize the row size of records
+     */
+    public void outputDirtyMetricsWithEstimate(String database, String table, long rowCount,
+            long rowSize) {
+        if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) {
+            invokeDirty(rowCount, rowSize);
+            return;
+        }
+        String identify = buildSchemaIdentify(database, null, table);
+        SinkMetricData subSinkMetricData;
+        if (subSinkMetricMap.containsKey(identify)) {
+            subSinkMetricData = subSinkMetricMap.get(identify);
+        } else {
+            subSinkMetricData = buildSubSinkMetricData(new String[]{database, table}, this);
+            subSinkMetricMap.put(identify, subSinkMetricData);
+        }
+        // sink metric and sub sink metric output metrics
+        this.invokeDirty(rowCount, rowSize);
+        subSinkMetricData.invokeDirty(rowCount, rowSize);
+    }
+
     public void outputMetricsWithEstimate(Object data) {
         long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
         invoke(1, size);
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 1b9bc4c48..efa04a630 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -76,6 +76,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -141,7 +143,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
     private transient MetricState metricState;
     private final String[] fieldNames;
     private volatile boolean jsonFormat;
-    private String keysType;
     private volatile RowData.FieldGetter[] fieldGetters;
     private String fieldDelimiter;
     private String lineDelimiter;
@@ -267,6 +268,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
                 .withInlongAudit(auditHostAndPorts)
                 .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
                 .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
@@ -486,6 +489,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             }
             throw ex;
         }
+
+        if (multipleSink) {
+            handleMultipleDirtyData(dirtyData, dirtyType, e);
+            return;
+        }
+
         if (dirtySink != null) {
             DirtyData.Builder<Object> builder = DirtyData.builder();
             try {
@@ -503,6 +512,43 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
                 LOG.warn("Dirty sink failed", ex);
             }
         }
+        metricData.invokeDirty(1, dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
+    }
+
+    private void handleMultipleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e) {
+        JsonNode rootNode;
+        try {
+            rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+        } catch (Exception ex) {
+            handleDirtyData(dirtyData, DirtyType.DESERIALIZE_ERROR, e);
+            return;
+        }
+
+        if (dirtySink != null) {
+            DirtyData.Builder<Object> builder = DirtyData.builder();
+            try {
+                builder.setData(dirtyData)
+                        .setDirtyType(dirtyType)
+                        .setLabels(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLabels()))
+                        .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLogTag()))
+                        .setDirtyMessage(e.getMessage())
+                        .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getIdentifier()));
+                dirtySink.invoke(builder.build());
+            } catch (Exception ex) {
+                if (!dirtyOptions.ignoreSideOutputErrors()) {
+                    throw new RuntimeException(ex);
+                }
+                LOG.warn("Dirty sink failed", ex);
+            }
+        }
+        try {
+            metricData.outputDirtyMetricsWithEstimate(
+                    jsonDynamicSchemaFormat.parse(rootNode, databasePattern),
+                    jsonDynamicSchemaFormat.parse(rootNode, tablePattern), 1,
+                    ((RowData) dirtyData).getBinary(0).length);
+        } catch (Exception ex) {
+            metricData.invokeDirty(1, dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
+        }
     }
 
     private void handleColumnsChange(String tableIdentifier, JsonNode rootNode, JsonNode physicalData) {