You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/04 11:40:31 UTC

[inlong] branch branch-1.5 updated (5c37f1efa -> c7af0fcc0)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 5c37f1efa [INLONG-7126][Sort] Support multiple dirty sink archive helper (#7127)
     new f2fcf934a [INLONG-7133][Dashboard] The sink's data node supports jumping to the node page (#7137)
     new e5351f5c0 [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118)
     new 6e6cadb9a [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141)
     new c7af0fcc0 [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc (#7142)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/components/NodeSelect/index.tsx            | 59 +++++++++++++++
 inlong-dashboard/src/locales/cn.json               |  8 +--
 inlong-dashboard/src/locales/en.json               |  8 +--
 .../src/metas/sinks/defaults/ClickHouse.ts         | 27 ++-----
 .../src/metas/sinks/defaults/Elasticsearch.ts      | 26 ++-----
 inlong-dashboard/src/metas/sinks/defaults/Hive.ts  | 27 ++-----
 .../src/metas/sinks/defaults/Iceberg.ts            | 27 ++-----
 inlong-dashboard/src/metas/sinks/defaults/MySQL.ts | 27 ++-----
 .../src/metas/sinks/defaults/StarRocks.ts          | 27 ++-----
 .../service/cluster/KafkaClusterOperator.java      | 25 +++++++
 .../service/cluster/TubeClusterOperator.java       | 29 +++++++-
 .../node/ck/ClickHouseDataNodeOperator.java        | 22 ++++++
 .../node/iceberg/IcebergDataNodeOperator.java      | 23 ++++++
 .../service/node/mysql/MySQLDataNodeOperator.java  | 23 ++++++
 .../node/starrocks/StarRocksDataNodeOperator.java  | 24 +++++++
 .../sort/base/metric/sub/SinkTableMetricData.java  | 13 ++++
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  2 +-
 .../sink/multiple/DynamicSchemaHandleOperator.java | 84 ++++++++++++++++++++--
 .../sink/multiple/IcebergMultipleStreamWriter.java | 60 +++-------------
 .../sink/multiple/IcebergSingleStreamWriter.java   | 10 ++-
 .../assigners/MySqlSnapshotSplitAssigner.java      | 24 ++++---
 21 files changed, 355 insertions(+), 220 deletions(-)
 create mode 100644 inlong-dashboard/src/components/NodeSelect/index.tsx


[inlong] 01/04: [INLONG-7133][Dashboard] The sink's data node supports jumping to the node page (#7137)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f2fcf934a0dbfcadafc9020dbb65dcd080489f7b
Author: Lizhen <88...@users.noreply.github.com>
AuthorDate: Wed Jan 4 15:57:46 2023 +0800

    [INLONG-7133][Dashboard] The sink's data node supports jumping to the node page (#7137)
---
 .../src/components/NodeSelect/index.tsx            | 59 ++++++++++++++++++++++
 inlong-dashboard/src/locales/cn.json               |  8 +--
 inlong-dashboard/src/locales/en.json               |  8 +--
 .../src/metas/sinks/defaults/ClickHouse.ts         | 27 ++--------
 .../src/metas/sinks/defaults/Elasticsearch.ts      | 26 ++--------
 inlong-dashboard/src/metas/sinks/defaults/Hive.ts  | 27 ++--------
 .../src/metas/sinks/defaults/Iceberg.ts            | 27 ++--------
 inlong-dashboard/src/metas/sinks/defaults/MySQL.ts | 27 ++--------
 .../src/metas/sinks/defaults/StarRocks.ts          | 27 ++--------
 9 files changed, 87 insertions(+), 149 deletions(-)

diff --git a/inlong-dashboard/src/components/NodeSelect/index.tsx b/inlong-dashboard/src/components/NodeSelect/index.tsx
new file mode 100644
index 000000000..4e83225e7
--- /dev/null
+++ b/inlong-dashboard/src/components/NodeSelect/index.tsx
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import React from 'react';
+import HighSelect, { HighSelectProps } from '@/components/HighSelect';
+import i18n from '@/i18n';
+import { Link } from 'react-router-dom';
+
+export interface NodeSelectProps extends HighSelectProps {
+  nodeType: string;
+}
+
+const NodeSelect: React.FC<NodeSelectProps> = _props => {
+  const props: HighSelectProps = {
+    ..._props,
+    showSearch: true,
+    allowClear: true,
+    filterOption: false,
+    options: {
+      ..._props.options,
+      requestTrigger: ['onOpen', 'onSearch'],
+      requestService: keyword => ({
+        url: '/node/list',
+        method: 'POST',
+        data: {
+          keyword,
+          type: _props.nodeType,
+          pageNum: 1,
+          pageSize: 20,
+        },
+      }),
+      requestParams: {
+        formatResult: result =>
+          result?.list?.map(item => ({
+            label: item.name,
+            value: item.name,
+          })),
+      },
+    },
+    addonAfter: <Link to="/node">{i18n.t('components.NodeSelect.Create')}</Link>,
+  };
+  return <HighSelect {...props} />;
+};
+
+export default NodeSelect;
diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index b778fb42d..31a32a43f 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -112,6 +112,7 @@
   "meta.Sinks.Password": "密码",
   "meta.Sinks.EnableCreateResource": "是否创建资源",
   "meta.Sinks.EnableCreateResourceHelp": "如果库表已经存在,且无需修改,则选【不创建】,否则请选择【创建】,由系统自动创建资源。",
+  "meta.Sinks.DataNodeName": "数据节点",
   "meta.Sinks.Hive.FileFormat": "落地格式",
   "meta.Sinks.Hive.Day": "天",
   "meta.Sinks.Hive.DataEncoding": "数据编码",
@@ -135,7 +136,6 @@
   "meta.Sinks.Hive.FieldDescription": "字段描述",
   "meta.Sinks.Hive.IsMetaField": "是否为元字段",
   "meta.Sinks.Hive.FieldFormat": "字段格式",
-  "meta.Sinks.Hive.DataNodeName": "数据节点",
   "meta.Sinks.ClickHouse.DbName": "DB 名称",
   "meta.Sinks.ClickHouse.TableName": "表名称",
   "meta.Sinks.ClickHouse.FlushInterval": "刷新的间隔",
@@ -159,8 +159,6 @@
   "meta.Sinks.ClickHouse.PrimaryKey": "主键",
   "meta.Sinks.ClickHouse.CompressionCode": "压缩格式",
   "meta.Sinks.ClickHouse.TtlExpr": "生命周期",
-  "meta.Sinks.ClickHouse.DataNodeName": "数据节点",
-  "meta.Sinks.ES.DataNodeName": "数据节点",
   "meta.Sinks.ES.IndexName": "索引名称",
   "meta.Sinks.ES.FlushRecord": "刷新的数据条数",
   "meta.Sinks.ES.FlushRecordUnit": "条",
@@ -191,7 +189,6 @@
   "meta.Sinks.Iceberg.FieldType": "字段类型",
   "meta.Sinks.Iceberg.FieldDescription": "字段描述",
   "meta.Sinks.Iceberg.PartitionStrategy": "分区策略",
-  "meta.Sinks.Iceberg.DataNodeName": "数据节点",
   "meta.Sinks.Hudi.DbName": "DB 名称",
   "meta.Sinks.Hudi.TableName": "表名称",
   "meta.Sinks.Hudi.Warehouse": "仓库路径",
@@ -225,7 +222,6 @@
   "meta.Sinks.MySQL.IsMetaField": "是否为元字段",
   "meta.Sinks.MySQL.FieldFormat": "字段格式",
   "meta.Sinks.MySQL.FieldDescription": "字段描述",
-  "meta.Sinks.MySQL.DataNodeName": "数据节点",
   "meta.Sinks.Oracle.TableName": "表名称",
   "meta.Sinks.Oracle.PrimaryKey": "主键",
   "meta.Sinks.Oracle.FieldName": "字段名",
@@ -292,7 +288,6 @@
   "meta.Sinks.HBase.IsMetaField": "是否为元字段",
   "meta.Sinks.HBase.FieldFormat": "字段格式",
   "meta.Sinks.HBase.FieldDescription": "字段描述",
-  "meta.Sinks.StarRocks.DataNodeName": "数据节点",
   "meta.Sinks.StarRocks.TableName": "表名称",
   "meta.Sinks.StarRocks.PrimaryKey": "主键",
   "meta.Sinks.StarRocks.DatabaseName": "数据库名",
@@ -439,6 +434,7 @@
   "components.Layout.NavWidget.Remind": "密码不一致,请重新输入",
   "components.HighSelect.Customize": "自定义",
   "components.HighSelect.SearchPlaceholder": "请输入关键字搜索",
+  "components.NodeSelect.Create": "新建节点",
   "configs.pagination.Total": "共{{total}}项",
   "pages.GroupDashboard.ExecutionLogModal.Re-executingSuccess": "重新执行成功",
   "pages.GroupDashboard.ExecutionLogModal.TaskType": "任务类型",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index b44a94ff5..20eac2c6d 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -112,6 +112,7 @@
   "meta.Sinks.Password": "Password",
   "meta.Sinks.EnableCreateResource": "EnableCreateResource",
   "meta.Sinks.EnableCreateResourceHelp": "If the library table already exists and does not need to be modified, select [Do not create], otherwise select [Create], and the system will automatically create the resource.",
+  "meta.Sinks.DataNodeName": "DataNode",
   "meta.Sinks.Hive.FileFormat": "File format",
   "meta.Sinks.Hive.Day": "Day(s)",
   "meta.Sinks.Hive.DataEncoding": "Data encoding",
@@ -135,7 +136,6 @@
   "meta.Sinks.Hive.FieldDescription": "Field description",
   "meta.Sinks.Hive.IsMetaField": "IsMetaField",
   "meta.Sinks.Hive.FieldFormat": "FieldFormat",
-  "meta.Sinks.Hive.DataNodeName": "DataNode",
   "meta.Sinks.ClickHouse.DbName": "DbName",
   "meta.Sinks.ClickHouse.TableName": "TableName",
   "meta.Sinks.ClickHouse.FlushInterval": "FlushInterval",
@@ -159,8 +159,6 @@
   "meta.Sinks.ClickHouse.PrimaryKey": "PrimaryKey",
   "meta.Sinks.ClickHouse.CompressionCode": "CompressionCode",
   "meta.Sinks.ClickHouse.TtlExpr": "TtlExpr",
-  "meta.Sinks.ClickHouse.DataNodeName": "DataNode",
-  "meta.Sinks.ES.DataNodeName": "DataNode",
   "meta.Sinks.ES.IndexName": "IndexName",
   "meta.Sinks.ES.FlushRecord": "FlushRecord",
   "meta.Sinks.ES.FlushRecordUnit": "items",
@@ -191,7 +189,6 @@
   "meta.Sinks.Iceberg.FieldType": "FieldType",
   "meta.Sinks.Iceberg.FieldDescription": "FieldDescription",
   "meta.Sinks.Iceberg.PartitionStrategy": "PartitionStrategy",
-  "meta.Sinks.Iceberg.DataNodeName": "DataNode",
   "meta.Sinks.Hudi.DbName": "DbName",
   "meta.Sinks.Hudi.TableName": "TableName",
   "meta.Sinks.Hudi.Warehouse": "Warehouse",
@@ -225,7 +222,6 @@
   "meta.Sinks.MySQL.IsMetaField": "IsMetaField",
   "meta.Sinks.MySQL.FieldFormat": "FieldFormat",
   "meta.Sinks.MySQL.FieldDescription": "FieldDescription",
-  "meta.Sinks.MySQL.DataNodeName": "DataNode",
   "meta.Sinks.Oracle.TableName": "TableName",
   "meta.Sinks.Oracle.PrimaryKey": "PrimaryKey",
   "meta.Sinks.Oracle.FieldName": "FieldName",
@@ -292,7 +288,6 @@
   "meta.Sinks.HBase.IsMetaField": "IsMetaField",
   "meta.Sinks.HBase.FieldFormat": "FieldFormat",
   "meta.Sinks.HBase.FieldDescription": "FieldDescription",
-  "meta.Sinks.StarRocks.DataNodeName": "DataNode",
   "meta.Sinks.StarRocks.TableName": "TableName",
   "meta.Sinks.StarRocks.PrimaryKey": "PrimaryKey",
   "meta.Sinks.StarRocks.DatabaseName": "Database Name",
@@ -439,6 +434,7 @@
   "components.Layout.NavWidget.Remind": "Password does not match, please re-enter",
   "components.HighSelect.Customize": "Customize",
   "components.HighSelect.SearchPlaceholder": "Please enter keyword...",
+  "components.NodeSelect.Create": "Create Node",
   "configs.pagination.Total": "Total {{total}} items",
   "pages.GroupDashboard.ExecutionLogModal.Re-executingSuccess": "Re-executing success",
   "pages.GroupDashboard.ExecutionLogModal.TaskType": "Task type",
diff --git a/inlong-dashboard/src/metas/sinks/defaults/ClickHouse.ts b/inlong-dashboard/src/metas/sinks/defaults/ClickHouse.ts
index 42f1aad49..a701d38a7 100644
--- a/inlong-dashboard/src/metas/sinks/defaults/ClickHouse.ts
+++ b/inlong-dashboard/src/metas/sinks/defaults/ClickHouse.ts
@@ -24,6 +24,7 @@ import i18n from '@/i18n';
 import EditableTable from '@/components/EditableTable';
 import { SinkInfo } from '../common/SinkInfo';
 import { sourceFields } from '../common/sourceFields';
+import NodeSelect from '@/components/NodeSelect';
 
 const { I18n } = DataWithBackend;
 const { FieldDecorator } = RenderRow;
@@ -93,34 +94,14 @@ export default class ClickHouseSink
   enableCreateResource: number;
 
   @FieldDecorator({
-    type: 'select',
+    type: NodeSelect,
     rules: [{ required: true }],
     props: values => ({
-      showSearch: true,
       disabled: [110, 130].includes(values?.status),
-      options: {
-        requestTrigger: ['onOpen', 'onSearch'],
-        requestService: keyword => ({
-          url: '/node/list',
-          method: 'POST',
-          data: {
-            keyword,
-            type: 'CLICKHOUSE',
-            pageNum: 1,
-            pageSize: 20,
-          },
-        }),
-        requestParams: {
-          formatResult: result =>
-            result?.list?.map(item => ({
-              label: item.name,
-              value: item.name,
-            })),
-        },
-      },
+      nodeType: 'CLICKHOUSE',
     }),
   })
-  @I18n('meta.Sinks.ClickHouse.DataNodeName')
+  @I18n('meta.Sinks.DataNodeName')
   dataNodeName: string;
 
   @FieldDecorator({
diff --git a/inlong-dashboard/src/metas/sinks/defaults/Elasticsearch.ts b/inlong-dashboard/src/metas/sinks/defaults/Elasticsearch.ts
index 0bc5ce6a0..857a71158 100644
--- a/inlong-dashboard/src/metas/sinks/defaults/Elasticsearch.ts
+++ b/inlong-dashboard/src/metas/sinks/defaults/Elasticsearch.ts
@@ -24,6 +24,7 @@ import i18n from '@/i18n';
 import EditableTable from '@/components/EditableTable';
 import { sourceFields } from '../common/sourceFields';
 import { SinkInfo } from '../common/SinkInfo';
+import NodeSelect from '@/components/NodeSelect';
 
 const { I18n } = DataWithBackend;
 const { FieldDecorator } = RenderRow;
@@ -52,33 +53,14 @@ export default class ElasticsearchSink
   implements DataWithBackend, RenderRow, RenderList
 {
   @FieldDecorator({
-    type: 'select',
+    type: NodeSelect,
     rules: [{ required: true }],
     props: values => ({
-      showSearch: true,
       disabled: [110, 130].includes(values?.status),
-      options: {
-        requestTrigger: ['onOpen', 'onSearch'],
-        requestService: {
-          url: '/node/list',
-          method: 'POST',
-          data: {
-            type: 'ELASTICSEARCH',
-            pageNum: 1,
-            pageSize: 20,
-          },
-        },
-        requestParams: {
-          formatResult: result =>
-            result?.list?.map(item => ({
-              label: item.name,
-              value: item.name,
-            })),
-        },
-      },
+      nodeType: 'ELASTICSEARCH',
     }),
   })
-  @I18n('meta.Sinks.ES.DataNodeName')
+  @I18n('meta.Sinks.DataNodeName')
   dataNodeName: string;
 
   @FieldDecorator({
diff --git a/inlong-dashboard/src/metas/sinks/defaults/Hive.ts b/inlong-dashboard/src/metas/sinks/defaults/Hive.ts
index 8cdd371c2..76d23af46 100644
--- a/inlong-dashboard/src/metas/sinks/defaults/Hive.ts
+++ b/inlong-dashboard/src/metas/sinks/defaults/Hive.ts
@@ -24,6 +24,7 @@ import i18n from '@/i18n';
 import EditableTable from '@/components/EditableTable';
 import { SinkInfo } from '../common/SinkInfo';
 import { sourceFields } from '../common/sourceFields';
+import NodeSelect from '@/components/NodeSelect';
 
 const { I18n } = DataWithBackend;
 const { FieldDecorator } = RenderRow;
@@ -117,34 +118,14 @@ export default class HiveSink extends SinkInfo implements DataWithBackend, Rende
   password: string;
 
   @FieldDecorator({
-    type: 'select',
+    type: NodeSelect,
     rules: [{ required: true }],
     props: values => ({
-      showSearch: true,
       disabled: [110, 130].includes(values?.status),
-      options: {
-        requestTrigger: ['onOpen', 'onSearch'],
-        requestService: keyword => ({
-          url: '/node/list',
-          method: 'POST',
-          data: {
-            keyword,
-            type: 'HIVE',
-            pageNum: 1,
-            pageSize: 20,
-          },
-        }),
-        requestParams: {
-          formatResult: result =>
-            result?.list?.map(item => ({
-              label: item.name,
-              value: item.name,
-            })),
-        },
-      },
+      nodeType: 'HIVE',
     }),
   })
-  @I18n('meta.Sinks.Hive.DataNodeName')
+  @I18n('meta.Sinks.DataNodeName')
   dataNodeName: string;
 
   @FieldDecorator({
diff --git a/inlong-dashboard/src/metas/sinks/defaults/Iceberg.ts b/inlong-dashboard/src/metas/sinks/defaults/Iceberg.ts
index fe033eda9..f7d06190a 100644
--- a/inlong-dashboard/src/metas/sinks/defaults/Iceberg.ts
+++ b/inlong-dashboard/src/metas/sinks/defaults/Iceberg.ts
@@ -24,6 +24,7 @@ import i18n from '@/i18n';
 import EditableTable from '@/components/EditableTable';
 import { sourceFields } from '../common/sourceFields';
 import { SinkInfo } from '../common/SinkInfo';
+import NodeSelect from '@/components/NodeSelect';
 
 const { I18n } = DataWithBackend;
 const { FieldDecorator } = RenderRow;
@@ -154,34 +155,14 @@ export default class IcebergSink
   enableCreateResource: number;
 
   @FieldDecorator({
-    type: 'select',
+    type: NodeSelect,
     rules: [{ required: true }],
     props: values => ({
-      showSearch: true,
       disabled: [110, 130].includes(values?.status),
-      options: {
-        requestTrigger: ['onOpen', 'onSearch'],
-        requestService: keyword => ({
-          url: '/node/list',
-          method: 'POST',
-          data: {
-            keyword,
-            type: 'ICEBERG',
-            pageNum: 1,
-            pageSize: 20,
-          },
-        }),
-        requestParams: {
-          formatResult: result =>
-            result?.list?.map(item => ({
-              label: item.name,
-              value: item.name,
-            })),
-        },
-      },
+      nodeType: 'ICEBERG',
     }),
   })
-  @I18n('meta.Sinks.Iceberg.DataNodeName')
+  @I18n('meta.Sinks.DataNodeName')
   dataNodeName: string;
 
   @FieldDecorator({
diff --git a/inlong-dashboard/src/metas/sinks/defaults/MySQL.ts b/inlong-dashboard/src/metas/sinks/defaults/MySQL.ts
index ac63f8746..d53acd518 100644
--- a/inlong-dashboard/src/metas/sinks/defaults/MySQL.ts
+++ b/inlong-dashboard/src/metas/sinks/defaults/MySQL.ts
@@ -22,6 +22,7 @@ import i18n from '@/i18n';
 import EditableTable from '@/components/EditableTable';
 import { sourceFields } from '../common/sourceFields';
 import { SinkInfo } from '../common/SinkInfo';
+import NodeSelect from '@/components/NodeSelect';
 
 const { I18n } = DataWithBackend;
 const { FieldDecorator } = RenderRow;
@@ -108,34 +109,14 @@ export default class HiveSink extends SinkInfo implements DataWithBackend, Rende
   enableCreateResource: number;
 
   @FieldDecorator({
-    type: 'select',
+    type: NodeSelect,
     rules: [{ required: true }],
     props: values => ({
-      showSearch: true,
       disabled: [110, 130].includes(values?.status),
-      options: {
-        requestTrigger: ['onOpen', 'onSearch'],
-        requestService: keyword => ({
-          url: '/node/list',
-          method: 'POST',
-          data: {
-            keyword,
-            type: 'MYSQL',
-            pageNum: 1,
-            pageSize: 20,
-          },
-        }),
-        requestParams: {
-          formatResult: result =>
-            result?.list?.map(item => ({
-              label: item.name,
-              value: item.name,
-            })),
-        },
-      },
+      nodeType: 'MYSQL',
     }),
   })
-  @I18n('meta.Sinks.MySQL.DataNodeName')
+  @I18n('meta.Sinks.DataNodeName')
   dataNodeName: string;
 
   @FieldDecorator({
diff --git a/inlong-dashboard/src/metas/sinks/defaults/StarRocks.ts b/inlong-dashboard/src/metas/sinks/defaults/StarRocks.ts
index c0fde2545..82823a0ed 100644
--- a/inlong-dashboard/src/metas/sinks/defaults/StarRocks.ts
+++ b/inlong-dashboard/src/metas/sinks/defaults/StarRocks.ts
@@ -22,6 +22,7 @@ import i18n from '@/i18n';
 import EditableTable from '@/components/EditableTable';
 import { sourceFields } from '../common/sourceFields';
 import { SinkInfo } from '../common/SinkInfo';
+import NodeSelect from '@/components/NodeSelect';
 
 const { I18n } = DataWithBackend;
 const { FieldDecorator } = RenderRow;
@@ -61,34 +62,14 @@ export default class StarRocksSink
   implements DataWithBackend, RenderRow, RenderList
 {
   @FieldDecorator({
-    type: 'select',
+    type: NodeSelect,
     rules: [{ required: true }],
     props: values => ({
-      showSearch: true,
       disabled: [110, 130].includes(values?.status),
-      options: {
-        requestTrigger: ['onOpen', 'onSearch'],
-        requestService: keyword => ({
-          url: '/node/list',
-          method: 'POST',
-          data: {
-            keyword,
-            type: 'STARROCKS',
-            pageNum: 1,
-            pageSize: 20,
-          },
-        }),
-        requestParams: {
-          formatResult: result =>
-            result?.list?.map(item => ({
-              label: item.name,
-              value: item.name,
-            })),
-        },
-      },
+      nodeType: 'STARROCKS',
     }),
   })
-  @I18n('meta.Sinks.StarRocks.DataNodeName')
+  @I18n('meta.Sinks.DataNodeName')
   dataNodeName: string;
 
   @FieldDecorator({


[inlong] 03/04: [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 6e6cadb9aca56869f27e2f5efd9cb9d054655d31
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Wed Jan 4 17:24:03 2023 +0800

    [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141)
    
    Co-authored-by: stingpeng <st...@tencent.com>
---
 .../assigners/MySqlSnapshotSplitAssigner.java      | 24 +++++++++++++---------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index 498edec76..d270fa792 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -205,17 +205,21 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
 
             executor.submit(
                     () -> {
-                        Iterator<TableId> iterator = remainingTables.iterator();
-                        while (iterator.hasNext()) {
-                            TableId nextTable = iterator.next();
-                            // split the given table into chunks (snapshot splits)
-                            Collection<MySqlSnapshotSplit> splits =
-                                    chunkSplitter.generateSplits(nextTable);
-                            synchronized (lock) {
-                                remainingSplits.addAll(splits);
-                                remainingTables.remove(nextTable);
-                                lock.notify();
+                        try {
+                            Iterator<TableId> iterator = remainingTables.iterator();
+                            while (iterator.hasNext()) {
+                                TableId nextTable = iterator.next();
+                                // split the given table into chunks (snapshot splits)
+                                Collection<MySqlSnapshotSplit> splits =
+                                        chunkSplitter.generateSplits(nextTable);
+                                synchronized (lock) {
+                                    remainingSplits.addAll(splits);
+                                    remainingTables.remove(nextTable);
+                                    lock.notify();
+                                }
                             }
+                        } catch (Exception e) {
+                            LOG.error("asynchronously split exit with exception", e);
                         }
                     });
         }


[inlong] 04/04: [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc (#7142)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit c7af0fcc0790d10ba0d4180c342aae8715a13f32
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Wed Jan 4 18:48:49 2023 +0800

    [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc (#7142)
---
 .../service/cluster/KafkaClusterOperator.java      | 25 +++++++++++++++++++
 .../service/cluster/TubeClusterOperator.java       | 29 ++++++++++++++++++++--
 .../node/ck/ClickHouseDataNodeOperator.java        | 22 ++++++++++++++++
 .../node/iceberg/IcebergDataNodeOperator.java      | 23 +++++++++++++++++
 .../service/node/mysql/MySQLDataNodeOperator.java  | 23 +++++++++++++++++
 .../node/starrocks/StarRocksDataNodeOperator.java  | 24 ++++++++++++++++++
 6 files changed, 144 insertions(+), 2 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
index 8c4eb37b7..dceac1864 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
@@ -23,17 +23,24 @@ import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterDTO;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterRequest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.Properties;
+
 /**
  * Kafka cluster operator.
  */
@@ -86,4 +93,22 @@ public class KafkaClusterOperator extends AbstractClusterOperator {
         }
     }
 
+    @Override
+    public Boolean testConnection(ClusterRequest request) {
+        String bootstrapServers = request.getUrl();
+        Preconditions.checkNotNull(bootstrapServers, "connection url cannot be empty");
+        Properties props = new Properties();
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        try (Admin ignored = Admin.create(props)) {
+            ListTopicsResult topics = ignored.listTopics(new ListTopicsOptions().timeoutMs(30000));
+            topics.names().get();
+            LOGGER.info("kafka connection not null - connection success for bootstrapServers={}", topics);
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("kafka connection failed for bootstrapServers=%s", bootstrapServers);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index cb24bd741..b3cf872fa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -19,22 +19,27 @@ package org.apache.inlong.manager.service.cluster;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterDTO;
 import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.service.group.InlongGroupOperator4NoneMQ;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * TubeMQ cluster operator.
  */
@@ -86,4 +91,24 @@ public class TubeClusterOperator extends AbstractClusterOperator {
         return tubeClusterInfo;
     }
 
+    @Override
+    public Boolean testConnection(ClusterRequest request) {
+        String masterUrl = request.getUrl();
+        int hostBeginIndex = masterUrl.lastIndexOf(InlongConstants.SLASH);
+        int portBeginIndex = masterUrl.lastIndexOf(InlongConstants.COLON);
+        String host = masterUrl.substring(hostBeginIndex + 1, portBeginIndex);
+        int port = Integer.parseInt(masterUrl.substring(portBeginIndex + 1));
+        Preconditions.checkNotNull(masterUrl, "connection url cannot be empty");
+        boolean result;
+        try {
+            result = HttpUtils.checkConnectivity(host, port, 10, TimeUnit.SECONDS);
+            LOGGER.info("tube connection not null - connection success for masterUrl={}", masterUrl);
+            return result;
+        } catch (Exception e) {
+            String errMsg = String.format("tube connection failed for masterUrl=%s", masterUrl);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
index 08e15de38..7f386416b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo;
 import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeRequest;
 import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseJdbcUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.sql.Connection;
+
 @Service
 public class ClickHouseDataNodeOperator extends AbstractDataNodeOperator {
 
@@ -82,4 +86,22 @@ public class ClickHouseDataNodeOperator extends AbstractDataNodeOperator {
         }
     }
 
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        String url = request.getUrl();
+        String username = request.getUsername();
+        String password = request.getToken();
+        Preconditions.checkNotNull(url, "connection url cannot be empty");
+        try (Connection ignored = ClickHouseJdbcUtils.getConnection(url, username, password)) {
+            LOGGER.info("clickhouse connection not null - connection success for url={}, username={}, password={}", url,
+                    username, password);
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("clickhouse connection failed for url=%s, username=%s, password=%s", url,
+                    username, password);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
index 991764610..74506feaa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.service.node.iceberg;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,6 +32,7 @@ import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo;
 import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeRequest;
 import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.iceberg.IcebergCatalogUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -83,4 +86,24 @@ public class IcebergDataNodeOperator extends AbstractDataNodeOperator {
         }
     }
 
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        IcebergDataNodeRequest icebergDataNodeRequest = (IcebergDataNodeRequest) request;
+        String metastoreUri = icebergDataNodeRequest.getUrl();
+        String warehouse = icebergDataNodeRequest.getWarehouse();
+        Preconditions.checkNotNull(metastoreUri, "connection url cannot be empty");
+        try {
+            HiveCatalog catalog = IcebergCatalogUtils.getCatalog(metastoreUri, warehouse);
+            catalog.listNamespaces();
+            LOGGER.info("iceberg connection not null - connection success for metastoreUri={}, warehouse={}",
+                    metastoreUri, warehouse);
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("iceberg connection failed for metastoreUri=%s, warhouse=%s", metastoreUri,
+                    warehouse);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
index aacd09dd9..9848b23c3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeInfo;
 import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeRequest;
 import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.mysql.MySQLJdbcUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.sql.Connection;
+
 /**
  * MySQL data node operator
  */
@@ -86,4 +90,23 @@ public class MySQLDataNodeOperator extends AbstractDataNodeOperator {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
         }
     }
+
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        String jdbcUrl = request.getUrl();
+        String username = request.getUsername();
+        String password = request.getToken();
+        Preconditions.checkNotNull(jdbcUrl, "connection jdbcUrl cannot be empty");
+        try (Connection ignored = MySQLJdbcUtils.getConnection(jdbcUrl, username, password)) {
+            LOGGER.info("mysql connection not null - connection success for jdbcUrl={}, username={}, password={}",
+                    jdbcUrl, username, password);
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("mysql connection failed for jdbcUrl=%s, username=%s, password=%s", jdbcUrl,
+                    username, password);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
index fb82d894c..e4249ac08 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/starrocks/StarRocksDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,11 +31,14 @@ import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo;
 import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeRequest;
 import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.starrocks.StarRocksJdbcUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.sql.Connection;
+
 @Service
 public class StarRocksDataNodeOperator extends AbstractDataNodeOperator {
 
@@ -83,4 +87,24 @@ public class StarRocksDataNodeOperator extends AbstractDataNodeOperator {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
         }
     }
+
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        String jdbcUrl = request.getUrl();
+        String username = request.getUsername();
+        String password = request.getToken();
+        Preconditions.checkNotNull(jdbcUrl, "connection jdbcUrl cannot be empty");
+        try (Connection ignored = StarRocksJdbcUtils.getConnection(jdbcUrl, username, password)) {
+            LOGGER.info("starRocks connection not null - connection success for jdbcUrl={}, username={}, password={}",
+                    jdbcUrl, username, password);
+            return true;
+        } catch (Exception e) {
+            String errMsg = String.format("starRocks connection failed for jdbcUrl=%s, username=%s, password=%s",
+                    jdbcUrl,
+                    username, password);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
 }


[inlong] 02/04: [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit e5351f5c00f28e730b178a2d64b6e99b4d449866
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Wed Jan 4 17:01:31 2023 +0800

    [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118)
---
 .../sort/base/metric/sub/SinkTableMetricData.java  | 13 ++++
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  2 +-
 .../sink/multiple/DynamicSchemaHandleOperator.java | 84 ++++++++++++++++++++--
 .../sink/multiple/IcebergMultipleStreamWriter.java | 60 +++-------------
 .../sink/multiple/IcebergSingleStreamWriter.java   | 10 ++-
 5 files changed, 110 insertions(+), 59 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
index a5690a5b5..842584ba7 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -256,6 +256,19 @@ public class SinkTableMetricData extends SinkMetricData implements SinkSubMetric
         subSinkMetricData.invokeDirty(rowCount, rowSize);
     }
 
+    /**
+     * output dirty metrics with estimate
+     *
+     * @param database the database name of record
+     * @param schema the schema name of record
+     * @param table the table name of record
+     * @param data the dirty data
+     */
+    public void outputDirtyMetricsWithEstimate(String database, String schema, String table, Object data) {
+        long size = data == null ? 0L : data.toString().getBytes(StandardCharsets.UTF_8).length;
+        outputDirtyMetrics(database, schema, table, 1, size);
+    }
+
     public void outputDirtyMetricsWithEstimate(Object data) {
         long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
         invokeDirty(1, size);
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index a5f070522..b0ed02abe 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -548,7 +548,7 @@ public class FlinkSink {
 
             int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
             DynamicSchemaHandleOperator routeOperator = new DynamicSchemaHandleOperator(
-                    catalogLoader, multipleSinkOption, dirtyOptions, dirtySink);
+                    catalogLoader, multipleSinkOption, dirtyOptions, dirtySink, inlongMetric, auditHostAndPorts);
             SingleOutputStreamOperator<RecordWithSchema> routeStream = input
                     .transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
                             TypeInformation.of(RecordWithSchema.class),
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 37bb6f944..53fef8dd1 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -17,6 +17,12 @@
 
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -43,9 +49,14 @@ import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.base.sink.TableChange;
 import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +72,12 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWithSchema>
         implements
             OneInputStreamOperator<RowData, RecordWithSchema>,
@@ -90,13 +107,22 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
 
+    // metric
+    private final String inlongMetric;
+    private final String auditHostAndPorts;
+    private @Nullable transient SinkTableMetricData metricData;
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+
     public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
             MultipleSinkOption multipleSinkOption, DirtyOptions dirtyOptions,
-            @Nullable DirtySink<Object> dirtySink) {
+            @Nullable DirtySink<Object> dirtySink, String inlongMetric, String auditHostAndPorts) {
         this.catalogLoader = catalogLoader;
         this.multipleSinkOption = multipleSinkOption;
         this.dirtyOptions = dirtyOptions;
         this.dirtySink = dirtySink;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
     }
 
     @SuppressWarnings("unchecked")
@@ -117,6 +143,20 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
         this.recordQueues = new HashMap<>();
         this.schemaCache = new HashMap<>();
         this.blacklist = new HashSet<>();
+
+        // Initialize metric
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkTableMetricData(metricOption, getRuntimeContext().getMetricGroup());
+        }
     }
 
     @Override
@@ -136,14 +176,15 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
             LOGGER.error(String.format("Deserialize error, raw data: %s",
                     new String(element.getValue().getBinary(0))), e);
             handleDirtyData(new String(element.getValue().getBinary(0)),
-                    null, DirtyType.DESERIALIZE_ERROR, e);
+                    null, DirtyType.DESERIALIZE_ERROR, e, TableIdentifier.of("unknow", "unknow"));
         }
         TableIdentifier tableId = null;
         try {
             tableId = parseId(jsonNode);
         } catch (Exception e) {
             LOGGER.error(String.format("Table identifier parse error, raw data: %s", jsonNode), e);
-            handleDirtyData(jsonNode, jsonNode, DirtyType.TABLE_IDENTIFIER_PARSE_ERROR, e);
+            handleDirtyData(jsonNode, jsonNode, DirtyType.TABLE_IDENTIFIER_PARSE_ERROR,
+                    e, TableIdentifier.of("unknow", "unknow"));
         }
         if (blacklist.contains(tableId)) {
             return;
@@ -156,7 +197,11 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
         }
     }
 
-    private void handleDirtyData(Object dirtyData, JsonNode rootNode, DirtyType dirtyType, Exception e) {
+    private void handleDirtyData(Object dirtyData,
+            JsonNode rootNode,
+            DirtyType dirtyType,
+            Exception e,
+            TableIdentifier tableId) {
         if (!dirtyOptions.ignoreDirty()) {
             RuntimeException ex;
             if (e instanceof RuntimeException) {
@@ -182,6 +227,10 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
                             .setIdentifier(dirtyOptions.getIdentifier());
                 }
                 dirtySink.invoke(builder.build());
+                if (metricData != null) {
+                    metricData.outputDirtyMetricsWithEstimate(
+                            tableId.namespace().toString(), null, tableId.name(), dirtyData);
+                }
             } catch (Exception ex) {
                 if (!dirtyOptions.ignoreSideOutputErrors()) {
                     throw new RuntimeException(ex);
@@ -198,6 +247,29 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
                 processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this);
     }
 
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        if (metricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        // init metric state
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            String.format(INLONG_METRIC_STATE_NAME), TypeInformation.of(new TypeHint<MetricState>() {
+                            })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
+    }
+
     private void execDDL(JsonNode jsonNode, TableIdentifier tableId) {
         // todo:parse ddl sql
     }
@@ -242,7 +314,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
                                 LOG.warn("Ignore table {} schema change, old: {} new: {}.",
                                         tableId, dataSchema, latestSchema, e);
                                 blacklist.add(tableId);
-                                handleDirtyData(jsonNode, jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e);
+                                handleDirtyData(jsonNode, jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId);
                             }
                             return Collections.emptyList();
                         });
@@ -329,7 +401,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
                     tableId,
                     pkListStr);
         } catch (Exception e) {
-            handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e);
+            handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e, tableId);
         }
         return null;
     }
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 1abd5b97a..3022ec6c0 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -17,10 +17,6 @@
 
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -38,13 +34,9 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.sink.TaskWriterFactory;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
-import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
 import org.slf4j.Logger;
@@ -65,9 +57,7 @@ import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
 import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * Iceberg writer that can distinguish different sink tables and route and distribute data into different
@@ -93,10 +83,6 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
     // metric
     private final String inlongMetric;
     private final String auditHostAndPorts;
-    @Nullable
-    private transient SinkMetricData metricData;
-    private transient ListState<MetricState> metricStateListState;
-    private transient MetricState metricState;
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
 
@@ -123,18 +109,6 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
         this.multipleWriters = new HashMap<>();
         this.multipleTables = new HashMap<>();
         this.multipleSchemas = new HashMap<>();
-
-        // Initialize metric
-        MetricOption metricOption = MetricOption.builder()
-                .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
-                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
-                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
-                .withRegisterMetric(RegisteredMetric.ALL)
-                .build();
-        if (metricOption != null) {
-            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
-        }
     }
 
     @Override
@@ -202,9 +176,14 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
                     appendMode);
 
             if (multipleWriters.get(tableId) == null) {
+                StringBuilder subWriterInlongMetric = new StringBuilder(inlongMetric);
+                subWriterInlongMetric.append(DELIMITER)
+                        .append(Constants.DATABASE_NAME).append("=").append(tableId.namespace().toString())
+                        .append(DELIMITER)
+                        .append(Constants.TABLE_NAME).append("=").append(tableId.name());
                 IcebergSingleStreamWriter<RowData> writer = new IcebergSingleStreamWriter<>(
-                        tableId.toString(), taskWriterFactory, null,
-                        null, flinkRowType, dirtyOptions, dirtySink);
+                        tableId.toString(), taskWriterFactory, subWriterInlongMetric.toString(),
+                        auditHostAndPorts, flinkRowType, dirtyOptions, dirtySink);
                 writer.setup(getRuntimeContext(),
                         new CallbackCollector<>(
                                 writeResult -> collector.collect(new MultipleWriteResult(tableId, writeResult))),
@@ -223,9 +202,6 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
         if (multipleWriters.get(tableId) != null) {
             for (RowData data : recordWithSchema.getData()) {
                 multipleWriters.get(tableId).processElement(data);
-                if (metricData != null) {
-                    metricData.invokeWithEstimate(data);
-                }
             }
         } else {
             LOG.error("Unregistered table schema for {}.", recordWithSchema.getTableId());
@@ -244,29 +220,11 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
         for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry : multipleWriters.entrySet()) {
             entry.getValue().snapshotState(context);
         }
-
-        // metric
-        if (metricData != null && metricStateListState != null) {
-            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
-                    getRuntimeContext().getIndexOfThisSubtask());
-        }
     }
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws Exception {
         this.functionInitializationContext = context;
-
-        // init metric state
-        if (this.inlongMetric != null) {
-            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
-                    new ListStateDescriptor<>(
-                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
-                            })));
-        }
-        if (context.isRestored()) {
-            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
-                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
-        }
     }
 
     private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index c8bfbeb08..dc30c5b21 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -44,6 +44,8 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import java.io.IOException;
 
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -105,6 +107,8 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ
                 .withInlongAudit(auditHostAndPorts)
                 .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
@@ -138,6 +142,9 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ
                             .setRowType(flinkRowType)
                             .setDirtyMessage(e.getMessage());
                     dirtySink.invoke(builder.build());
+                    if (metricData != null) {
+                        metricData.invokeDirtyWithEstimate(value);
+                    }
                 } catch (Exception ex) {
                     if (!dirtyOptions.ignoreSideOutputErrors()) {
                         throw new RuntimeException(ex);
@@ -157,7 +164,8 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ
         if (this.inlongMetric != null) {
             this.metricStateListState = context.getOperatorStateStore().getUnionListState(
                     new ListStateDescriptor<>(
-                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                            String.format("Iceberg(%s)-" + INLONG_METRIC_STATE_NAME, fullTableName),
+                            TypeInformation.of(new TypeHint<MetricState>() {
                             })));
         }
         if (context.isRestored()) {