You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/06 09:19:14 UTC

[incubator-inlong] branch master updated: [INLONG-4280][Manager] Add params for Iceberg sink (#4341)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a6b7d259 [INLONG-4280][Manager] Add params for Iceberg sink (#4341)
3a6b7d259 is described below

commit 3a6b7d259175bc53fc5ff6a3d2b13c8598f58a92
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Mon Jun 6 17:19:09 2022 +0800

    [INLONG-4280][Manager] Add params for Iceberg sink (#4341)
---
 .../manager/common/pojo/sink/iceberg/IcebergSink.java      |  6 ++++++
 .../manager/common/pojo/sink/iceberg/IcebergSinkDTO.java   |  8 ++++++++
 .../common/pojo/sink/iceberg/IcebergSinkListResponse.java  |  6 ++++++
 .../common/pojo/sink/iceberg/IcebergSinkRequest.java       |  6 ++++++
 .../inlong/manager/service/sort/util/LoadNodeUtils.java    |  9 ++++++---
 .../inlong/sort/protocol/constant/IcebergConstant.java     | 14 +++++++++++++-
 6 files changed, 45 insertions(+), 4 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
index 0b9ed63c4..01a1ea80f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
@@ -60,6 +60,12 @@ public class IcebergSink extends StreamSink {
     @ApiModelProperty("File format, support: Parquet, Orc, Avro")
     private String fileFormat;
 
+    @ApiModelProperty("Catalog type, like: hive, hadoop, default is hive")
+    private String catalogType = "hive";
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
     public IcebergSink() {
         this.setSinkType(SinkType.SINK_ICEBERG);
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
index 1d713eb53..017b0ce10 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -60,6 +60,12 @@ public class IcebergSinkDTO {
     @ApiModelProperty("File format, support: Parquet, Orc, Avro")
     private String fileFormat;
 
+    @ApiModelProperty("Catalog type, like: hive, hadoop, default is hive")
+    private String catalogType;
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
     @ApiModelProperty("Properties for iceberg")
     private Map<String, Object> properties;
 
@@ -74,6 +80,8 @@ public class IcebergSinkDTO {
                 .tableName(request.getTableName())
                 .dataPath(request.getDataPath())
                 .fileFormat(request.getFileFormat())
+                .catalogType(request.getCatalogType())
+                .primaryKey(request.getPrimaryKey())
                 .properties(request.getProperties())
                 .build();
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java
index 88e799705..b97016663 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java
@@ -61,4 +61,10 @@ public class IcebergSinkListResponse extends SinkListResponse {
     @ApiModelProperty("partition type, like: H-hour, D-day, W-week, M-month, O-once, R-regulation")
     private String partitionType;
 
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
+    @ApiModelProperty("Catalog type, like: hive, hadoop, default is hive")
+    private String catalogType;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
index e4380f17d..dbe1649ae 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -54,4 +54,10 @@ public class IcebergSinkRequest extends SinkRequest {
     @ApiModelProperty("File format, support: Parquet, Orc, Avro")
     private String fileFormat;
 
+    @ApiModelProperty("Catalog type, like: hive, hadoop, default is hive")
+    private String catalogType = "hive";
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 6307dd071..173eb09d3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSink;
 import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSink;
 import org.apache.inlong.manager.common.pojo.sink.sqlserver.SqlServerSink;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
 import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.format.AvroFormat;
 import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
@@ -275,6 +276,8 @@ public class LoadNodeUtils {
         String tableName = icebergSink.getTableName();
         String uri = icebergSink.getCatalogUri();
         String warehouse = icebergSink.getWarehouse();
+        String primaryKey = icebergSink.getPrimaryKey();
+        CatalogType catalogType = CatalogType.forName(icebergSink.getCatalogType());
 
         List<SinkField> sinkFields = icebergSink.getFieldList();
         List<FieldInfo> fields = sinkFields.stream()
@@ -283,9 +286,9 @@ public class LoadNodeUtils {
         List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFields, name);
         Map<String, String> properties = icebergSink.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
-        return new IcebergLoadNode(id, name, fields, fieldRelationShips, null, null, 1, properties,
-                dbName, tableName, null, null, uri, warehouse);
+        return new IcebergLoadNode(
+                id, name, fields, fieldRelationShips, null, null, 1, properties,
+                dbName, tableName, primaryKey, catalogType, uri, warehouse);
     }
 
     /**
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
index f846febd9..34d14b1a7 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/IcebergConstant.java
@@ -37,6 +37,18 @@ public class IcebergConstant {
         /**
          * Data stored in hybris metastore.
          */
-        HYBRIS
+        HYBRIS;
+
+        /**
+         * get catalogType from name
+         */
+        public static CatalogType forName(String name) {
+            for (CatalogType value : values()) {
+                if (value.name().equals(name)) {
+                    return value;
+                }
+            }
+            throw new IllegalArgumentException(String.format("Unsupport catalogType:%s", name));
+        }
     }
 }