You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/10 08:23:59 UTC

[incubator-inlong] branch release-1.2.0 updated: [INLONG-4612][Dashboard][Manager] Update Iceberg config UI and related protocol key names (#4618)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/release-1.2.0 by this push:
     new c554c91ee [INLONG-4612][Dashboard][Manager] Update Iceberg config UI and related protocol key names (#4618)
c554c91ee is described below

commit c554c91ee71a27c146e8bb9e5c90d97b271fc9b4
Author: woofyzhao <49...@qq.com>
AuthorDate: Fri Jun 10 15:59:24 2022 +0800

    [INLONG-4612][Dashboard][Manager] Update Iceberg config UI and related protocol key names (#4618)
---
 .../src/components/MetaData/StorageIceberg.tsx     | 54 +++++++-------------
 inlong-dashboard/src/locales/cn.json               |  1 +
 inlong-dashboard/src/locales/en.json               |  1 +
 .../common/pojo/sink/iceberg/IcebergSink.java      |  4 +-
 .../pojo/sink/iceberg/IcebergSinkRequest.java      |  4 +-
 .../manager/plugin/flink/FlinkOperation.java       | 57 +++++++++++++---------
 6 files changed, 57 insertions(+), 64 deletions(-)

diff --git a/inlong-dashboard/src/components/MetaData/StorageIceberg.tsx b/inlong-dashboard/src/components/MetaData/StorageIceberg.tsx
index 375a13531..259ca8d5e 100644
--- a/inlong-dashboard/src/components/MetaData/StorageIceberg.tsx
+++ b/inlong-dashboard/src/components/MetaData/StorageIceberg.tsx
@@ -35,18 +35,17 @@ import { sourceDataFields } from './SourceDataFields';
 const icebergFieldTypes = [
   'string',
   'boolean',
-  'short',
   'int',
   'long',
   'float',
   'double',
-  'decimal(P,S)',
+  'decimal',
   'date',
   'time',
   'timestamp',
   'timestamptz',
   'binary',
-  'fixed(L)',
+  'fixed',
   'uuid',
 ].map(item => ({
   label: item,
@@ -55,6 +54,11 @@ const icebergFieldTypes = [
 
 const matchPartitionStrategies = fieldType => {
   const data = [
+    {
+      label: 'None',
+      value: 'None',
+      disabled: false,
+    },
     {
       label: 'Identity',
       value: 'Identity',
@@ -91,13 +95,13 @@ const matchPartitionStrategies = fieldType => {
         'long',
         'float',
         'double',
-        'decimal(P,S)',
+        'decimal',
       ].includes(fieldType),
     },
     {
       label: 'Truncate',
       value: 'Truncate',
-      disabled: !['string', 'int', 'long', 'binary', 'decimal(P,S)'].includes(fieldType),
+      disabled: !['string', 'int', 'long', 'binary', 'decimal'].includes(fieldType),
     },
   ];
 
@@ -152,33 +156,11 @@ const getForm: GetStorageFormFieldsType = (
     },
     {
       type: 'input',
-      label: i18n.t('components.AccessHelper.StorageMetaData.Username'),
-      name: 'username',
-      rules: [{ required: true }],
-      props: {
-        disabled: isEdit && [110, 130].includes(currentValues?.status),
-      },
-      _inTable: true,
-    },
-    {
-      type: 'password',
-      label: i18n.t('components.AccessHelper.StorageMetaData.Password'),
-      name: 'password',
-      rules: [{ required: true }],
-      props: {
-        disabled: isEdit && [110, 130].includes(currentValues?.status),
-        style: {
-          maxWidth: 500,
-        },
-      },
-    },
-    {
-      type: 'input',
-      label: 'JDBC URL',
-      name: 'jdbcUrl',
+      label: 'Catalog URI',
+      name: 'catalogUri',
       rules: [{ required: true }],
       props: {
-        placeholder: 'jdbc:hive2://127.0.0.1:10000',
+        placeholder: 'thrift://127.0.0.1:9083',
         disabled: isEdit && [110, 130].includes(currentValues?.status),
         style: { width: 500 },
       },
@@ -206,8 +188,8 @@ const getForm: GetStorageFormFieldsType = (
     },
     {
       type: 'input',
-      label: i18n.t('components.AccessHelper.StorageMetaData.Hive.DataPath'),
-      name: 'dataPath',
+      label: i18n.t('components.AccessHelper.StorageMetaData.Iceberg.Warehouse'),
+      name: 'warehouse',
       rules: [{ required: true }],
       props: {
         placeholder: 'hdfs://127.0.0.1:9000/user/iceberg/warehouse',
@@ -348,7 +330,7 @@ const getFieldListColumns: GetStorageColumnsType = (dataType, currentValues) =>
       },
       initialValue: 1,
       rules: [{ type: 'number', required: true }],
-      visible: (text, record) => record.fieldType === 'fixed(L)',
+      visible: (text, record) => record.fieldType === 'fixed',
     },
     {
       title: 'Precision',
@@ -359,7 +341,7 @@ const getFieldListColumns: GetStorageColumnsType = (dataType, currentValues) =>
       },
       initialValue: 1,
       rules: [{ type: 'number', required: true }],
-      visible: (text, record) => record.fieldType === 'decimal(P,S)',
+      visible: (text, record) => record.fieldType === 'decimal',
     },
     {
       title: 'Scale',
@@ -370,13 +352,13 @@ const getFieldListColumns: GetStorageColumnsType = (dataType, currentValues) =>
       },
       initialValue: 1,
       rules: [{ type: 'number', required: true }],
-      visible: (text, record) => record.fieldType === 'decimal(P,S)',
+      visible: (text, record) => record.fieldType === 'decimal',
     },
     {
       title: i18n.t('components.AccessHelper.StorageMetaData.Iceberg.PartitionStrategy'),
       dataIndex: 'partitionStrategy',
       type: 'select',
-      initialValue: 'Identity',
+      initialValue: 'None',
       rules: [{ required: true }],
       props: (text, record) => ({
         options: matchPartitionStrategies(record.fieldType),
diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index a9a1fbe1f..b4219a24b 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -95,6 +95,7 @@
   "components.AccessHelper.StorageMetaData.Kafka.AutoOffsetReset": "自动偏移量重置",
   "components.AccessHelper.StorageMetaData.Iceberg.DbName": "DB名称",
   "components.AccessHelper.StorageMetaData.Iceberg.TableName": "表名称",
+  "components.AccessHelper.StorageMetaData.Iceberg.Warehouse": "仓库路径",
   "components.AccessHelper.StorageMetaData.Iceberg.FileFormat": "⽂件格式",
   "components.AccessHelper.StorageMetaData.Iceberg.Description": "表描述",
   "components.AccessHelper.StorageMetaData.Iceberg.ExtList": "属性",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index 222941100..5756da64c 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -95,6 +95,7 @@
   "components.AccessHelper.StorageMetaData.Kafka.AutoOffsetReset": "AutoOffsetReset",
   "components.AccessHelper.StorageMetaData.Iceberg.DbName": "DbName",
   "components.AccessHelper.StorageMetaData.Iceberg.TableName": "TableName",
+  "components.AccessHelper.StorageMetaData.Iceberg.Warehouse": "Warehouse",
   "components.AccessHelper.StorageMetaData.Iceberg.FileFormat": "FileFormat",
   "components.AccessHelper.StorageMetaData.Iceberg.Description": "Description",
   "components.AccessHelper.StorageMetaData.Iceberg.ExtList": "ExtList",
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
index 01a1ea80f..9b80865a2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
@@ -60,8 +60,8 @@ public class IcebergSink extends StreamSink {
     @ApiModelProperty("File format, support: Parquet, Orc, Avro")
     private String fileFormat;
 
-    @ApiModelProperty("Catalog type, like: hive, hadoop, default is hive")
-    private String catalogType = "hive";
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    private String catalogType = "HIVE";
 
     @ApiModelProperty("Primary key")
     private String primaryKey;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
index dbe1649ae..93f10df7e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -54,8 +54,8 @@ public class IcebergSinkRequest extends SinkRequest {
     @ApiModelProperty("File format, support: Parquet, Orc, Avro")
     private String fileFormat;
 
-    @ApiModelProperty("Catalog type, like: hive, hadoop, default is hive")
-    private String catalogType = "hive";
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    private String catalogType = "HIVE";
 
     @ApiModelProperty("Primary key")
     private String primaryKey;
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index a22afb221..4fefc75a8 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -45,7 +45,6 @@ public class FlinkOperation {
     private static final String INLONG_SORT = "inlong-sort";
     private static final String SORT_JAR_PATTERN = "^sort-dist.*jar$";
     private static final String SORT_PLUGIN = "sort-plugin" + File.separator + "connectors";
-    private static final String CONNECTOR_JAR_PATTERN = "^sort-connector.*jar$";
 
     private final FlinkService flinkService;
 
@@ -53,6 +52,35 @@ public class FlinkOperation {
         this.flinkService = flinkService;
     }
 
+    /**
+     * Get Sort connector jar patterns from the Flink info.
+     */
+    private static String getConnectorJarPattern(FlinkInfo flinkInfo) {
+        if (StringUtils.isNotEmpty(flinkInfo.getSourceType()) && StringUtils.isNotEmpty(flinkInfo.getSinkType())) {
+            return String.format("^sort-connector-(?i)(%s|%s).*jar$", flinkInfo.getSourceType(),
+                    flinkInfo.getSinkType());
+        } else {
+            return "^sort-connector-.*jar$";
+        }
+    }
+
+    /**
+     * Restart the Flink job.
+     */
+    public void restart(FlinkInfo flinkInfo) throws Exception {
+        String jobId = flinkInfo.getJobId();
+        boolean terminated = isNullOrTerminated(jobId);
+        if (terminated) {
+            String message = String.format("restart job failed, as " + JOB_TERMINATED_MSG, jobId);
+            log.error(message);
+            throw new Exception(message);
+        }
+
+        Future<?> future = TaskRunService.submit(
+                new IntegrationTaskRunner(flinkService, flinkInfo, TaskCommitType.RESTART.getCode()));
+        future.get();
+    }
+
     /**
      * Start the Flink job, if the job id was not empty, restore it.
      */
@@ -112,7 +140,7 @@ public class FlinkOperation {
         log.info("get sort jar path success, path: {}", jarPath);
 
         String pluginPath = startPath + SORT_PLUGIN;
-        List<String> connectorPaths = FlinkUtils.listFiles(pluginPath, CONNECTOR_JAR_PATTERN, -1);
+        List<String> connectorPaths = FlinkUtils.listFiles(pluginPath, getConnectorJarPattern(flinkInfo), -1);
         flinkInfo.setConnectorJarPaths(connectorPaths);
         log.info("get sort connector paths success, paths: {}", connectorPaths);
 
@@ -125,23 +153,6 @@ public class FlinkOperation {
         }
     }
 
-    /**
-     * Restart the Flink job.
-     */
-    public void restart(FlinkInfo flinkInfo) throws Exception {
-        String jobId = flinkInfo.getJobId();
-        boolean terminated = isNullOrTerminated(jobId);
-        if (terminated) {
-            String message = String.format("restart job failed, as " + JOB_TERMINATED_MSG, jobId);
-            log.error(message);
-            throw new Exception(message);
-        }
-
-        Future<?> future = TaskRunService.submit(new IntegrationTaskRunner(flinkService, flinkInfo,
-                TaskCommitType.RESTART.getCode()));
-        future.get();
-    }
-
     /**
      * Stop the Flink job.
      */
@@ -155,8 +166,7 @@ public class FlinkOperation {
         }
 
         Future<?> future = TaskRunService.submit(
-                new IntegrationTaskRunner(flinkService, flinkInfo,
-                        TaskCommitType.STOP.getCode()));
+                new IntegrationTaskRunner(flinkService, flinkInfo, TaskCommitType.STOP.getCode()));
         future.get();
     }
 
@@ -178,8 +188,7 @@ public class FlinkOperation {
         }
 
         Future<?> future = TaskRunService.submit(
-                new IntegrationTaskRunner(flinkService, flinkInfo,
-                        TaskCommitType.DELETE.getCode()));
+                new IntegrationTaskRunner(flinkService, flinkInfo, TaskCommitType.DELETE.getCode()));
         future.get();
     }
 
@@ -230,7 +239,7 @@ public class FlinkOperation {
         boolean terminated = jobDetailsInfo == null || jobDetailsInfo.getJobStatus() == null;
         if (terminated) {
             log.warn("job detail or job status was null for [{}]", jobId);
-            return terminated;
+            return true;
         }
 
         terminated = jobDetailsInfo.getJobStatus().isTerminalState();