You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/10 08:23:59 UTC
[incubator-inlong] branch release-1.2.0 updated: [INLONG-4612][Dashboard][Manager] Update Iceberg config UI and related protocol key names (#4618)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/release-1.2.0 by this push:
new c554c91ee [INLONG-4612][Dashboard][Manager] Update Iceberg config UI and related protocol key names (#4618)
c554c91ee is described below
commit c554c91ee71a27c146e8bb9e5c90d97b271fc9b4
Author: woofyzhao <49...@qq.com>
AuthorDate: Fri Jun 10 15:59:24 2022 +0800
[INLONG-4612][Dashboard][Manager] Update Iceberg config UI and related protocol key names (#4618)
---
.../src/components/MetaData/StorageIceberg.tsx | 54 +++++++-------------
inlong-dashboard/src/locales/cn.json | 1 +
inlong-dashboard/src/locales/en.json | 1 +
.../common/pojo/sink/iceberg/IcebergSink.java | 4 +-
.../pojo/sink/iceberg/IcebergSinkRequest.java | 4 +-
.../manager/plugin/flink/FlinkOperation.java | 57 +++++++++++++---------
6 files changed, 57 insertions(+), 64 deletions(-)
diff --git a/inlong-dashboard/src/components/MetaData/StorageIceberg.tsx b/inlong-dashboard/src/components/MetaData/StorageIceberg.tsx
index 375a13531..259ca8d5e 100644
--- a/inlong-dashboard/src/components/MetaData/StorageIceberg.tsx
+++ b/inlong-dashboard/src/components/MetaData/StorageIceberg.tsx
@@ -35,18 +35,17 @@ import { sourceDataFields } from './SourceDataFields';
const icebergFieldTypes = [
'string',
'boolean',
- 'short',
'int',
'long',
'float',
'double',
- 'decimal(P,S)',
+ 'decimal',
'date',
'time',
'timestamp',
'timestamptz',
'binary',
- 'fixed(L)',
+ 'fixed',
'uuid',
].map(item => ({
label: item,
@@ -55,6 +54,11 @@ const icebergFieldTypes = [
const matchPartitionStrategies = fieldType => {
const data = [
+ {
+ label: 'None',
+ value: 'None',
+ disabled: false,
+ },
{
label: 'Identity',
value: 'Identity',
@@ -91,13 +95,13 @@ const matchPartitionStrategies = fieldType => {
'long',
'float',
'double',
- 'decimal(P,S)',
+ 'decimal',
].includes(fieldType),
},
{
label: 'Truncate',
value: 'Truncate',
- disabled: !['string', 'int', 'long', 'binary', 'decimal(P,S)'].includes(fieldType),
+ disabled: !['string', 'int', 'long', 'binary', 'decimal'].includes(fieldType),
},
];
@@ -152,33 +156,11 @@ const getForm: GetStorageFormFieldsType = (
},
{
type: 'input',
- label: i18n.t('components.AccessHelper.StorageMetaData.Username'),
- name: 'username',
- rules: [{ required: true }],
- props: {
- disabled: isEdit && [110, 130].includes(currentValues?.status),
- },
- _inTable: true,
- },
- {
- type: 'password',
- label: i18n.t('components.AccessHelper.StorageMetaData.Password'),
- name: 'password',
- rules: [{ required: true }],
- props: {
- disabled: isEdit && [110, 130].includes(currentValues?.status),
- style: {
- maxWidth: 500,
- },
- },
- },
- {
- type: 'input',
- label: 'JDBC URL',
- name: 'jdbcUrl',
+ label: 'Catalog URI',
+ name: 'catalogUri',
rules: [{ required: true }],
props: {
- placeholder: 'jdbc:hive2://127.0.0.1:10000',
+ placeholder: 'thrift://127.0.0.1:9083',
disabled: isEdit && [110, 130].includes(currentValues?.status),
style: { width: 500 },
},
@@ -206,8 +188,8 @@ const getForm: GetStorageFormFieldsType = (
},
{
type: 'input',
- label: i18n.t('components.AccessHelper.StorageMetaData.Hive.DataPath'),
- name: 'dataPath',
+ label: i18n.t('components.AccessHelper.StorageMetaData.Iceberg.Warehouse'),
+ name: 'warehouse',
rules: [{ required: true }],
props: {
placeholder: 'hdfs://127.0.0.1:9000/user/iceberg/warehouse',
@@ -348,7 +330,7 @@ const getFieldListColumns: GetStorageColumnsType = (dataType, currentValues) =>
},
initialValue: 1,
rules: [{ type: 'number', required: true }],
- visible: (text, record) => record.fieldType === 'fixed(L)',
+ visible: (text, record) => record.fieldType === 'fixed',
},
{
title: 'Precision',
@@ -359,7 +341,7 @@ const getFieldListColumns: GetStorageColumnsType = (dataType, currentValues) =>
},
initialValue: 1,
rules: [{ type: 'number', required: true }],
- visible: (text, record) => record.fieldType === 'decimal(P,S)',
+ visible: (text, record) => record.fieldType === 'decimal',
},
{
title: 'Scale',
@@ -370,13 +352,13 @@ const getFieldListColumns: GetStorageColumnsType = (dataType, currentValues) =>
},
initialValue: 1,
rules: [{ type: 'number', required: true }],
- visible: (text, record) => record.fieldType === 'decimal(P,S)',
+ visible: (text, record) => record.fieldType === 'decimal',
},
{
title: i18n.t('components.AccessHelper.StorageMetaData.Iceberg.PartitionStrategy'),
dataIndex: 'partitionStrategy',
type: 'select',
- initialValue: 'Identity',
+ initialValue: 'None',
rules: [{ required: true }],
props: (text, record) => ({
options: matchPartitionStrategies(record.fieldType),
diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index a9a1fbe1f..b4219a24b 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -95,6 +95,7 @@
"components.AccessHelper.StorageMetaData.Kafka.AutoOffsetReset": "自动偏移量重置",
"components.AccessHelper.StorageMetaData.Iceberg.DbName": "DB名称",
"components.AccessHelper.StorageMetaData.Iceberg.TableName": "表名称",
+ "components.AccessHelper.StorageMetaData.Iceberg.Warehouse": "仓库路径",
"components.AccessHelper.StorageMetaData.Iceberg.FileFormat": "⽂件格式",
"components.AccessHelper.StorageMetaData.Iceberg.Description": "表描述",
"components.AccessHelper.StorageMetaData.Iceberg.ExtList": "属性",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index 222941100..5756da64c 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -95,6 +95,7 @@
"components.AccessHelper.StorageMetaData.Kafka.AutoOffsetReset": "AutoOffsetReset",
"components.AccessHelper.StorageMetaData.Iceberg.DbName": "DbName",
"components.AccessHelper.StorageMetaData.Iceberg.TableName": "TableName",
+ "components.AccessHelper.StorageMetaData.Iceberg.Warehouse": "Warehouse",
"components.AccessHelper.StorageMetaData.Iceberg.FileFormat": "FileFormat",
"components.AccessHelper.StorageMetaData.Iceberg.Description": "Description",
"components.AccessHelper.StorageMetaData.Iceberg.ExtList": "ExtList",
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
index 01a1ea80f..9b80865a2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
@@ -60,8 +60,8 @@ public class IcebergSink extends StreamSink {
@ApiModelProperty("File format, support: Parquet, Orc, Avro")
private String fileFormat;
- @ApiModelProperty("Catalog type, like: hive, hadoop, default is hive")
- private String catalogType = "hive";
+ @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+ private String catalogType = "HIVE";
@ApiModelProperty("Primary key")
private String primaryKey;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
index dbe1649ae..93f10df7e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -54,8 +54,8 @@ public class IcebergSinkRequest extends SinkRequest {
@ApiModelProperty("File format, support: Parquet, Orc, Avro")
private String fileFormat;
- @ApiModelProperty("Catalog type, like: hive, hadoop, default is hive")
- private String catalogType = "hive";
+ @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+ private String catalogType = "HIVE";
@ApiModelProperty("Primary key")
private String primaryKey;
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index a22afb221..4fefc75a8 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -45,7 +45,6 @@ public class FlinkOperation {
private static final String INLONG_SORT = "inlong-sort";
private static final String SORT_JAR_PATTERN = "^sort-dist.*jar$";
private static final String SORT_PLUGIN = "sort-plugin" + File.separator + "connectors";
- private static final String CONNECTOR_JAR_PATTERN = "^sort-connector.*jar$";
private final FlinkService flinkService;
@@ -53,6 +52,35 @@ public class FlinkOperation {
this.flinkService = flinkService;
}
+ /**
+ * Get Sort connector jar patterns from the Flink info.
+ */
+ private static String getConnectorJarPattern(FlinkInfo flinkInfo) {
+ if (StringUtils.isNotEmpty(flinkInfo.getSourceType()) && StringUtils.isNotEmpty(flinkInfo.getSinkType())) {
+ return String.format("^sort-connector-(?i)(%s|%s).*jar$", flinkInfo.getSourceType(),
+ flinkInfo.getSinkType());
+ } else {
+ return "^sort-connector-.*jar$";
+ }
+ }
+
+ /**
+ * Restart the Flink job.
+ */
+ public void restart(FlinkInfo flinkInfo) throws Exception {
+ String jobId = flinkInfo.getJobId();
+ boolean terminated = isNullOrTerminated(jobId);
+ if (terminated) {
+ String message = String.format("restart job failed, as " + JOB_TERMINATED_MSG, jobId);
+ log.error(message);
+ throw new Exception(message);
+ }
+
+ Future<?> future = TaskRunService.submit(
+ new IntegrationTaskRunner(flinkService, flinkInfo, TaskCommitType.RESTART.getCode()));
+ future.get();
+ }
+
/**
* Start the Flink job, if the job id was not empty, restore it.
*/
@@ -112,7 +140,7 @@ public class FlinkOperation {
log.info("get sort jar path success, path: {}", jarPath);
String pluginPath = startPath + SORT_PLUGIN;
- List<String> connectorPaths = FlinkUtils.listFiles(pluginPath, CONNECTOR_JAR_PATTERN, -1);
+ List<String> connectorPaths = FlinkUtils.listFiles(pluginPath, getConnectorJarPattern(flinkInfo), -1);
flinkInfo.setConnectorJarPaths(connectorPaths);
log.info("get sort connector paths success, paths: {}", connectorPaths);
@@ -125,23 +153,6 @@ public class FlinkOperation {
}
}
- /**
- * Restart the Flink job.
- */
- public void restart(FlinkInfo flinkInfo) throws Exception {
- String jobId = flinkInfo.getJobId();
- boolean terminated = isNullOrTerminated(jobId);
- if (terminated) {
- String message = String.format("restart job failed, as " + JOB_TERMINATED_MSG, jobId);
- log.error(message);
- throw new Exception(message);
- }
-
- Future<?> future = TaskRunService.submit(new IntegrationTaskRunner(flinkService, flinkInfo,
- TaskCommitType.RESTART.getCode()));
- future.get();
- }
-
/**
* Stop the Flink job.
*/
@@ -155,8 +166,7 @@ public class FlinkOperation {
}
Future<?> future = TaskRunService.submit(
- new IntegrationTaskRunner(flinkService, flinkInfo,
- TaskCommitType.STOP.getCode()));
+ new IntegrationTaskRunner(flinkService, flinkInfo, TaskCommitType.STOP.getCode()));
future.get();
}
@@ -178,8 +188,7 @@ public class FlinkOperation {
}
Future<?> future = TaskRunService.submit(
- new IntegrationTaskRunner(flinkService, flinkInfo,
- TaskCommitType.DELETE.getCode()));
+ new IntegrationTaskRunner(flinkService, flinkInfo, TaskCommitType.DELETE.getCode()));
future.get();
}
@@ -230,7 +239,7 @@ public class FlinkOperation {
boolean terminated = jobDetailsInfo == null || jobDetailsInfo.getJobStatus() == null;
if (terminated) {
log.warn("job detail or job status was null for [{}]", jobId);
- return terminated;
+ return true;
}
terminated = jobDetailsInfo.getJobStatus().isTerminalState();