You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/01 13:26:31 UTC

[incubator-inlong] branch master updated: [INLONG-4408][Manager][Sort] Add Iceberg sink load node (#4409)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 04d7a5f1c [INLONG-4408][Manager][Sort] Add Iceberg sink load node (#4409)
04d7a5f1c is described below

commit 04d7a5f1c74d532f680e95d4e7e80401cdb81c05
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Jun 1 21:26:26 2022 +0800

    [INLONG-4408][Manager][Sort] Add Iceberg sink load node (#4409)
---
 .../common/pojo/sink/iceberg/IcebergSink.java      | 20 ++++------------
 .../common/pojo/sink/iceberg/IcebergSinkDTO.java   | 28 +++++-----------------
 .../pojo/sink/iceberg/IcebergSinkListResponse.java |  7 ++++--
 .../pojo/sink/iceberg/IcebergSinkRequest.java      | 20 ++++------------
 .../resource/iceberg/IcebergResourceOperator.java  |  4 ++--
 .../manager/service/sort/util/LoadNodeUtils.java   | 27 +++++++++++++++++++++
 .../apache/inlong/sort/protocol/node/LoadNode.java | 16 +++++++------
 .../sort/protocol/node/load/HbaseLoadNode.java     | 10 ++++----
 .../sort/protocol/node/load/HbaseLoadNodeTest.java |  3 +++
 ...eLoadNodeTest.java => IcebergLoadNodeTest.java} | 25 ++++++++++++++-----
 10 files changed, 84 insertions(+), 76 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
index 65a9555ea..0b9ed63c4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
@@ -42,14 +42,11 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @JsonTypeDefine(value = SinkType.SINK_ICEBERG)
 public class IcebergSink extends StreamSink {
 
-    @ApiModelProperty("Hive JDBC URL")
-    private String jdbcUrl;
+    @ApiModelProperty("Catalog URI")
+    private String catalogUri;
 
-    @ApiModelProperty("Username for JDBC URL")
-    private String username;
-
-    @ApiModelProperty("User password")
-    private String password;
+    @ApiModelProperty("Data warehouse")
+    private String warehouse;
 
     @ApiModelProperty("Target database name")
     private String dbName;
@@ -63,15 +60,6 @@ public class IcebergSink extends StreamSink {
     @ApiModelProperty("File format, support: Parquet, Orc, Avro")
     private String fileFormat;
 
-    @ApiModelProperty("Data encoding type")
-    private String dataEncoding;
-
-    @ApiModelProperty("Data field separator")
-    private String dataSeparator;
-
-    @ApiModelProperty("Data consistency strategy, support: EXACTLY_ONCE(default), AT_LEAST_ONCE")
-    private String dataConsistency;
-
     public IcebergSink() {
         this.setSinkType(SinkType.SINK_ICEBERG);
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
index 53e48fa50..1d713eb53 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -42,14 +42,11 @@ public class IcebergSinkDTO {
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
-    @ApiModelProperty("Hive JDBC URL")
-    private String jdbcUrl;
+    @ApiModelProperty("Catalog uri, such as hive metastore thrift://ip:port")
+    private String catalogUri;
 
-    @ApiModelProperty("Username for JDBC URL")
-    private String username;
-
-    @ApiModelProperty("User password")
-    private String password;
+    @ApiModelProperty("Iceberg data warehouse dir")
+    private String warehouse;
 
     @ApiModelProperty("Target database name")
     private String dbName;
@@ -63,15 +60,6 @@ public class IcebergSinkDTO {
     @ApiModelProperty("File format, support: Parquet, Orc, Avro")
     private String fileFormat;
 
-    @ApiModelProperty("Data encoding type")
-    private String dataEncoding;
-
-    @ApiModelProperty("Data field separator")
-    private String dataSeparator;
-
-    @ApiModelProperty("Data consistency strategy, support: EXACTLY_ONCE(default), AT_LEAST_ONCE")
-    private String dataConsistency;
-
     @ApiModelProperty("Properties for iceberg")
     private Map<String, Object> properties;
 
@@ -80,16 +68,12 @@ public class IcebergSinkDTO {
      */
     public static IcebergSinkDTO getFromRequest(IcebergSinkRequest request) {
         return IcebergSinkDTO.builder()
-                .jdbcUrl(request.getJdbcUrl())
-                .username(request.getUsername())
-                .password(request.getPassword())
+                .catalogUri(request.getCatalogUri())
+                .warehouse(request.getWarehouse())
                 .dbName(request.getDbName())
                 .tableName(request.getTableName())
                 .dataPath(request.getDataPath())
                 .fileFormat(request.getFileFormat())
-                .dataEncoding(request.getDataEncoding())
-                .dataSeparator(request.getDataSeparator())
-                .dataConsistency(request.getDataConsistency())
                 .properties(request.getProperties())
                 .build();
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java
index 1fe46cc56..88e799705 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java
@@ -49,8 +49,11 @@ public class IcebergSinkListResponse extends SinkListResponse {
     @ApiModelProperty("username")
     private String username;
 
-    @ApiModelProperty("JDBC URL")
-    private String jdbcUrl;
+    @ApiModelProperty("Catalog URI")
+    private String catalogUri;
+
+    @ApiModelProperty("Data warehouse dir")
+    private String warehouse;
 
     @ApiModelProperty("Data path, such as: hdfs://ip:port/user/hive/warehouse/test.db")
     private String dataPath;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
index 6938daed2..e4380f17d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -36,14 +36,11 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @JsonTypeDefine(value = SinkType.SINK_ICEBERG)
 public class IcebergSinkRequest extends SinkRequest {
 
-    @ApiModelProperty("Hive JDBC URL")
-    private String jdbcUrl;
+    @ApiModelProperty("Catalog URI")
+    private String catalogUri;
 
-    @ApiModelProperty("Username for JDBC URL")
-    private String username;
-
-    @ApiModelProperty("User password")
-    private String password;
+    @ApiModelProperty("Iceberg warehouse dir")
+    private String warehouse;
 
     @ApiModelProperty("Target database name")
     private String dbName;
@@ -57,13 +54,4 @@ public class IcebergSinkRequest extends SinkRequest {
     @ApiModelProperty("File format, support: Parquet, Orc, Avro")
     private String fileFormat;
 
-    @ApiModelProperty("Data encoding type")
-    private String dataEncoding;
-
-    @ApiModelProperty("Data field separator")
-    private String dataSeparator;
-
-    @ApiModelProperty("Data consistency strategy, support: EXACTLY_ONCE(default), AT_LEAST_ONCE")
-    private String dataConsistency;
-
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
index 7f3e1e4bb..bb1857eb8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
@@ -89,8 +89,8 @@ public class IcebergResourceOperator implements SinkResourceOperator {
         }
         IcebergTableInfo tableInfo = IcebergSinkDTO.getIcebergTableInfo(icebergInfo, columnInfoList);
 
-        String metastoreUri = icebergInfo.getJdbcUrl();
-        String warehouse = icebergInfo.getDataPath();
+        String metastoreUri = icebergInfo.getCatalogUri();
+        String warehouse = icebergInfo.getWarehouse();
         String dbName = icebergInfo.getDbName();
         String tableName = icebergInfo.getTableName();
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index ffa9054cd..69f1d2070 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSink;
 import org.apache.inlong.manager.common.pojo.sink.hbase.HBaseSink;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSink;
+import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSink;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSink;
 import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSink;
 import org.apache.inlong.sort.protocol.FieldInfo;
@@ -40,6 +41,7 @@ import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
@@ -79,6 +81,8 @@ public class LoadNodeUtils {
                 return createLoadNode((PostgresSink) streamSink);
             case CLICKHOUSE:
                 return createLoadNode((ClickHouseSink) streamSink);
+            case ICEBERG:
+                return createLoadNode((IcebergSink) streamSink);
             default:
                 throw new IllegalArgumentException(
                         String.format("Unsupported sinkType=%s to create loadNode", sinkType));
@@ -253,6 +257,29 @@ public class LoadNodeUtils {
                 ckSink.getPassword());
     }
 
+    /**
+     * create iceberg load node
+     */
+    public static IcebergLoadNode createLoadNode(IcebergSink icebergSink) {
+        String id = icebergSink.getSinkName();
+        String name = icebergSink.getSinkName();
+        String dbName = icebergSink.getDbName();
+        String tableName = icebergSink.getTableName();
+        String uri = icebergSink.getCatalogUri();
+        String warehouse = icebergSink.getWarehouse();
+
+        List<SinkField> sinkFields = icebergSink.getFieldList();
+        List<FieldInfo> fields = sinkFields.stream()
+                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
+                .collect(Collectors.toList());
+        List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFields, name);
+        Map<String, String> properties = icebergSink.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+
+        return new IcebergLoadNode(id, name, fields, fieldRelationShips, null, null, 1, properties,
+                dbName, tableName, null, null, uri, warehouse);
+    }
+
     /**
      * Parse information field of data sink.
      */
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index b7aa19df4..bf796207a 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -34,6 +34,7 @@ import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
 import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
@@ -64,6 +65,7 @@ import java.util.Map;
         @JsonSubTypes.Type(value = SqlServerLoadNode.class, name = "sqlserverLoad"),
         @JsonSubTypes.Type(value = TDSQLPostgresLoadNode.class, name = "tdsqlPostgresLoad"),
         @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad"),
+        @JsonSubTypes.Type(value = IcebergLoadNode.class, name = "icebergLoad"),
         @JsonSubTypes.Type(value = ElasticsearchLoadNode.class, name = "elasticsearchLoad"),
         @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
         @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad")
@@ -98,13 +100,13 @@ public abstract class LoadNode implements Node {
 
     @JsonCreator
     public LoadNode(@JsonProperty("id") String id,
-                    @JsonProperty("name") String name,
-                    @JsonProperty("fields") List<FieldInfo> fields,
-                    @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
-                    @JsonProperty("filters") List<FilterFunction> filters,
-                    @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
-                    @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
-                    @Nullable @JsonProperty("properties") Map<String, String> properties) {
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+            @JsonProperty("filters") List<FilterFunction> filters,
+            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+            @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+            @Nullable @JsonProperty("properties") Map<String, String> properties) {
         this.id = Preconditions.checkNotNull(id, "id is null");
         this.name = name;
         this.fields = Preconditions.checkNotNull(fields, "fields is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
index 1a28cad2a..ff3355a99 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
@@ -51,8 +51,8 @@ public class HbaseLoadNode extends LoadNode implements Serializable {
     @JsonProperty("tableName")
     private String tableName;
 
-    @JsonProperty("nameSpace")
-    private String nameSpace;
+    @JsonProperty("namespace")
+    private String namespace;
 
     @JsonProperty("zookeeperQuorum")
     private String zookeeperQuorum;
@@ -82,7 +82,7 @@ public class HbaseLoadNode extends LoadNode implements Serializable {
             @JsonProperty("sinkParallelism") Integer sinkParallelism,
             @JsonProperty("properties") Map<String, String> properties,
             @JsonProperty("tableName") String tableName,
-            @JsonProperty("nameSpace") String nameSpace,
+            @JsonProperty("namespace") String namespace,
             @JsonProperty("zookeeperQuorum") String zookeeperQuorum,
             @JsonProperty("rowKey") String rowKey,
             @JsonProperty("sinkBufferFlushMaxSize") String sinkBufferFlushMaxSize,
@@ -91,7 +91,7 @@ public class HbaseLoadNode extends LoadNode implements Serializable {
             @JsonProperty("sinkBufferFlushInterval") String sinkBufferFlushInterval) {
         super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.tableName = Preconditions.checkNotNull(tableName, "tableName of hbase is null");
-        this.nameSpace = Preconditions.checkNotNull(nameSpace, "nameSpace of hbase is null");
+        this.namespace = Preconditions.checkNotNull(namespace, "namespace of hbase is null");
         this.zookeeperQuorum = Preconditions.checkNotNull(zookeeperQuorum, "zookeeperQuorum of hbase is null");
         this.rowKey = Preconditions.checkNotNull(rowKey, "rowKey of hbase is null");
         this.sinkBufferFlushMaxSize = sinkBufferFlushMaxSize;
@@ -104,7 +104,7 @@ public class HbaseLoadNode extends LoadNode implements Serializable {
     public Map<String, String> tableOptions() {
         Map<String, String> map = super.tableOptions();
         map.put(HBaseConstant.CONNECTOR, HBaseConstant.HBASE_2);
-        map.put(HBaseConstant.TABLE_NAME, nameSpace + ":" + tableName);
+        map.put(HBaseConstant.TABLE_NAME, namespace + ":" + tableName);
         map.put(HBaseConstant.ZOOKEEPER_QUORUM, zookeeperQuorum);
         if (StringUtils.isNotEmpty(sinkBufferFlushInterval)) {
             map.put(HBaseConstant.SINK_BUFFER_FLUSH_INTERVAL, sinkBufferFlushInterval);
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
index 9775ac297..d7a2ebb94 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
@@ -25,6 +25,9 @@ import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.Arrays;
 
+/**
+ * Test for {@link HbaseLoadNode}
+ */
 public class HbaseLoadNodeTest extends SerializeBaseTest<HbaseLoadNode> {
 
     @Override
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
similarity index 62%
copy from inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
index 9775ac297..bcddb2b2f 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
@@ -21,18 +21,31 @@ package org.apache.inlong.sort.protocol.node.load;
 import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.Arrays;
 
-public class HbaseLoadNodeTest extends SerializeBaseTest<HbaseLoadNode> {
+/**
+ * Test for {@link IcebergLoadNode}
+ */
+public class IcebergLoadNodeTest extends SerializeBaseTest<IcebergLoadNode> {
 
     @Override
-    public HbaseLoadNode getTestObject() {
-        return new HbaseLoadNode("2", "test_hbase",
-                Arrays.asList(new FieldInfo("cf:id", new StringFormatInfo())),
+    public IcebergLoadNode getTestObject() {
+        return new IcebergLoadNode("1", "test_iceberg",
+                Arrays.asList(new FieldInfo("id", new StringFormatInfo())),
                 Arrays.asList(new FieldRelation(new FieldInfo("id", new StringFormatInfo()),
-                        new FieldInfo("cf:id", new StringFormatInfo()))), null, null, 1, null, "mytable", "default",
-                "localhost:2181", "MD5(`id`)", null, null, null, null);
+                        new FieldInfo("id", new StringFormatInfo()))),
+                null,
+                null,
+                1,
+                null,
+                "test_db",
+                "test_table",
+                "id",
+                CatalogType.HIVE,
+                "thrift://localhost:9083",
+                "hdfs://localhost:9000/user/iceberg/warehouse");
     }
 }