You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/06 09:19:14 UTC
[incubator-inlong] branch master updated: [INLONG-4280][Manager] Add params for Iceberg sink (#4341)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3a6b7d259 [INLONG-4280][Manager] Add params for Iceberg sink (#4341)
3a6b7d259 is described below
commit 3a6b7d259175bc53fc5ff6a3d2b13c8598f58a92
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Mon Jun 6 17:19:09 2022 +0800
[INLONG-4280][Manager] Add params for Iceberg sink (#4341)
---
.../manager/common/pojo/sink/iceberg/IcebergSink.java | 6 ++++++
.../manager/common/pojo/sink/iceberg/IcebergSinkDTO.java | 8 ++++++++
.../common/pojo/sink/iceberg/IcebergSinkListResponse.java | 6 ++++++
.../common/pojo/sink/iceberg/IcebergSinkRequest.java | 6 ++++++
.../inlong/manager/service/sort/util/LoadNodeUtils.java | 9 ++++++---
.../inlong/sort/protocol/constant/IcebergConstant.java | 14 +++++++++++++-
6 files changed, 45 insertions(+), 4 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
index 0b9ed63c4..01a1ea80f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
@@ -60,6 +60,12 @@ public class IcebergSink extends StreamSink {
@ApiModelProperty("File format, support: Parquet, Orc, Avro")
private String fileFormat;
+ @ApiModelProperty("Catalog type, like: hive, hadoop, default is hive")
+ private String catalogType = "hive";
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
public IcebergSink() {
this.setSinkType(SinkType.SINK_ICEBERG);
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
index 1d713eb53..017b0ce10 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -60,6 +60,12 @@ public class IcebergSinkDTO {
@ApiModelProperty("File format, support: Parquet, Orc, Avro")
private String fileFormat;
+ @ApiModelProperty("Catalog type, like: hive, hadoop, default is hive")
+ private String catalogType;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
@ApiModelProperty("Properties for iceberg")
private Map<String, Object> properties;
@@ -74,6 +80,8 @@ public class IcebergSinkDTO {
.tableName(request.getTableName())
.dataPath(request.getDataPath())
.fileFormat(request.getFileFormat())
+ .catalogType(request.getCatalogType())
+ .primaryKey(request.getPrimaryKey())
.properties(request.getProperties())
.build();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java
index 88e799705..b97016663 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java
@@ -61,4 +61,10 @@ public class IcebergSinkListResponse extends SinkListResponse {
@ApiModelProperty("partition type, like: H-hour, D-day, W-week, M-month, O-once, R-regulation")
private String partitionType;
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
+ @ApiModelProperty("Catalog type, like: hive, hadoop, default is hive")
+ private String catalogType;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
index e4380f17d..dbe1649ae 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -54,4 +54,10 @@ public class IcebergSinkRequest extends SinkRequest {
@ApiModelProperty("File format, support: Parquet, Orc, Avro")
private String fileFormat;
+ @ApiModelProperty("Catalog type, like: hive, hadoop, default is hive")
+ private String catalogType = "hive";
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 6307dd071..173eb09d3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSink;
import org.apache.inlong.manager.common.pojo.sink.sqlserver.SqlServerSink;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
@@ -275,6 +276,8 @@ public class LoadNodeUtils {
String tableName = icebergSink.getTableName();
String uri = icebergSink.getCatalogUri();
String warehouse = icebergSink.getWarehouse();
+ String primaryKey = icebergSink.getPrimaryKey();
+ CatalogType catalogType = CatalogType.forName(icebergSink.getCatalogType());
List<SinkField> sinkFields = icebergSink.getFieldList();
List<FieldInfo> fields = sinkFields.stream()
@@ -283,9 +286,9 @@ public class LoadNodeUtils {
List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFields, name);
Map<String, String> properties = icebergSink.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
- return new IcebergLoadNode(id, name, fields, fieldRelationShips, null, null, 1, properties,
- dbName, tableName, null, null, uri, warehouse);
+ return new IcebergLoadNode(
+ id, name, fields, fieldRelationShips, null, null, 1, properties,
+ dbName, tableName, primaryKey, catalogType, uri, warehouse);
}
/**
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
index f846febd9..34d14b1a7 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
@@ -37,6 +37,18 @@ public class IcebergConstant {
/**
* Data stored in hybris metastore.
*/
- HYBRIS
+ HYBRIS;
+
+ /**
+ * get catalogType from name
+ */
+ public static CatalogType forName(String name) {
+ for (CatalogType value : values()) {
+ if (value.name().equals(name)) {
+ return value;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unsupport catalogType:%s", name));
+ }
}
}