You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/01 13:26:31 UTC
[incubator-inlong] branch master updated: [INLONG-4408][Manager][Sort] Add Iceberg sink load node (#4409)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 04d7a5f1c [INLONG-4408][Manager][Sort] Add Iceberg sink load node (#4409)
04d7a5f1c is described below
commit 04d7a5f1c74d532f680e95d4e7e80401cdb81c05
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Jun 1 21:26:26 2022 +0800
[INLONG-4408][Manager][Sort] Add Iceberg sink load node (#4409)
---
.../common/pojo/sink/iceberg/IcebergSink.java | 20 ++++------------
.../common/pojo/sink/iceberg/IcebergSinkDTO.java | 28 +++++-----------------
.../pojo/sink/iceberg/IcebergSinkListResponse.java | 7 ++++--
.../pojo/sink/iceberg/IcebergSinkRequest.java | 20 ++++------------
.../resource/iceberg/IcebergResourceOperator.java | 4 ++--
.../manager/service/sort/util/LoadNodeUtils.java | 27 +++++++++++++++++++++
.../apache/inlong/sort/protocol/node/LoadNode.java | 16 +++++++------
.../sort/protocol/node/load/HbaseLoadNode.java | 10 ++++----
.../sort/protocol/node/load/HbaseLoadNodeTest.java | 3 +++
...eLoadNodeTest.java => IcebergLoadNodeTest.java} | 25 ++++++++++++++-----
10 files changed, 84 insertions(+), 76 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
index 65a9555ea..0b9ed63c4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSink.java
@@ -42,14 +42,11 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@JsonTypeDefine(value = SinkType.SINK_ICEBERG)
public class IcebergSink extends StreamSink {
- @ApiModelProperty("Hive JDBC URL")
- private String jdbcUrl;
+ @ApiModelProperty("Catalog URI")
+ private String catalogUri;
- @ApiModelProperty("Username for JDBC URL")
- private String username;
-
- @ApiModelProperty("User password")
- private String password;
+ @ApiModelProperty("Data warehouse")
+ private String warehouse;
@ApiModelProperty("Target database name")
private String dbName;
@@ -63,15 +60,6 @@ public class IcebergSink extends StreamSink {
@ApiModelProperty("File format, support: Parquet, Orc, Avro")
private String fileFormat;
- @ApiModelProperty("Data encoding type")
- private String dataEncoding;
-
- @ApiModelProperty("Data field separator")
- private String dataSeparator;
-
- @ApiModelProperty("Data consistency strategy, support: EXACTLY_ONCE(default), AT_LEAST_ONCE")
- private String dataConsistency;
-
public IcebergSink() {
this.setSinkType(SinkType.SINK_ICEBERG);
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
index 53e48fa50..1d713eb53 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -42,14 +42,11 @@ public class IcebergSinkDTO {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- @ApiModelProperty("Hive JDBC URL")
- private String jdbcUrl;
+ @ApiModelProperty("Catalog uri, such as hive metastore thrift://ip:port")
+ private String catalogUri;
- @ApiModelProperty("Username for JDBC URL")
- private String username;
-
- @ApiModelProperty("User password")
- private String password;
+ @ApiModelProperty("Iceberg data warehouse dir")
+ private String warehouse;
@ApiModelProperty("Target database name")
private String dbName;
@@ -63,15 +60,6 @@ public class IcebergSinkDTO {
@ApiModelProperty("File format, support: Parquet, Orc, Avro")
private String fileFormat;
- @ApiModelProperty("Data encoding type")
- private String dataEncoding;
-
- @ApiModelProperty("Data field separator")
- private String dataSeparator;
-
- @ApiModelProperty("Data consistency strategy, support: EXACTLY_ONCE(default), AT_LEAST_ONCE")
- private String dataConsistency;
-
@ApiModelProperty("Properties for iceberg")
private Map<String, Object> properties;
@@ -80,16 +68,12 @@ public class IcebergSinkDTO {
*/
public static IcebergSinkDTO getFromRequest(IcebergSinkRequest request) {
return IcebergSinkDTO.builder()
- .jdbcUrl(request.getJdbcUrl())
- .username(request.getUsername())
- .password(request.getPassword())
+ .catalogUri(request.getCatalogUri())
+ .warehouse(request.getWarehouse())
.dbName(request.getDbName())
.tableName(request.getTableName())
.dataPath(request.getDataPath())
.fileFormat(request.getFileFormat())
- .dataEncoding(request.getDataEncoding())
- .dataSeparator(request.getDataSeparator())
- .dataConsistency(request.getDataConsistency())
.properties(request.getProperties())
.build();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java
index 1fe46cc56..88e799705 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkListResponse.java
@@ -49,8 +49,11 @@ public class IcebergSinkListResponse extends SinkListResponse {
@ApiModelProperty("username")
private String username;
- @ApiModelProperty("JDBC URL")
- private String jdbcUrl;
+ @ApiModelProperty("Catalog URI")
+ private String catalogUri;
+
+ @ApiModelProperty("Data warehouse dir")
+ private String warehouse;
@ApiModelProperty("Data path, such as: hdfs://ip:port/user/hive/warehouse/test.db")
private String dataPath;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
index 6938daed2..e4380f17d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -36,14 +36,11 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@JsonTypeDefine(value = SinkType.SINK_ICEBERG)
public class IcebergSinkRequest extends SinkRequest {
- @ApiModelProperty("Hive JDBC URL")
- private String jdbcUrl;
+ @ApiModelProperty("Catalog URI")
+ private String catalogUri;
- @ApiModelProperty("Username for JDBC URL")
- private String username;
-
- @ApiModelProperty("User password")
- private String password;
+ @ApiModelProperty("Iceberg warehouse dir")
+ private String warehouse;
@ApiModelProperty("Target database name")
private String dbName;
@@ -57,13 +54,4 @@ public class IcebergSinkRequest extends SinkRequest {
@ApiModelProperty("File format, support: Parquet, Orc, Avro")
private String fileFormat;
- @ApiModelProperty("Data encoding type")
- private String dataEncoding;
-
- @ApiModelProperty("Data field separator")
- private String dataSeparator;
-
- @ApiModelProperty("Data consistency strategy, support: EXACTLY_ONCE(default), AT_LEAST_ONCE")
- private String dataConsistency;
-
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
index 7f3e1e4bb..bb1857eb8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
@@ -89,8 +89,8 @@ public class IcebergResourceOperator implements SinkResourceOperator {
}
IcebergTableInfo tableInfo = IcebergSinkDTO.getIcebergTableInfo(icebergInfo, columnInfoList);
- String metastoreUri = icebergInfo.getJdbcUrl();
- String warehouse = icebergInfo.getDataPath();
+ String metastoreUri = icebergInfo.getCatalogUri();
+ String warehouse = icebergInfo.getWarehouse();
String dbName = icebergInfo.getDbName();
String tableName = icebergInfo.getTableName();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index ffa9054cd..69f1d2070 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSink;
import org.apache.inlong.manager.common.pojo.sink.hbase.HBaseSink;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSink;
+import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSink;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSink;
import org.apache.inlong.sort.protocol.FieldInfo;
@@ -40,6 +41,7 @@ import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
@@ -79,6 +81,8 @@ public class LoadNodeUtils {
return createLoadNode((PostgresSink) streamSink);
case CLICKHOUSE:
return createLoadNode((ClickHouseSink) streamSink);
+ case ICEBERG:
+ return createLoadNode((IcebergSink) streamSink);
default:
throw new IllegalArgumentException(
String.format("Unsupported sinkType=%s to create loadNode", sinkType));
@@ -253,6 +257,29 @@ public class LoadNodeUtils {
ckSink.getPassword());
}
+ /**
+ * create iceberg load node
+ */
+ public static IcebergLoadNode createLoadNode(IcebergSink icebergSink) {
+ String id = icebergSink.getSinkName();
+ String name = icebergSink.getSinkName();
+ String dbName = icebergSink.getDbName();
+ String tableName = icebergSink.getTableName();
+ String uri = icebergSink.getCatalogUri();
+ String warehouse = icebergSink.getWarehouse();
+
+ List<SinkField> sinkFields = icebergSink.getFieldList();
+ List<FieldInfo> fields = sinkFields.stream()
+ .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
+ .collect(Collectors.toList());
+ List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFields, name);
+ Map<String, String> properties = icebergSink.getProperties().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+
+ return new IcebergLoadNode(id, name, fields, fieldRelationShips, null, null, 1, properties,
+ dbName, tableName, null, null, uri, warehouse);
+ }
+
/**
* Parse information field of data sink.
*/
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index b7aa19df4..bf796207a 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -34,6 +34,7 @@ import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
@@ -64,6 +65,7 @@ import java.util.Map;
@JsonSubTypes.Type(value = SqlServerLoadNode.class, name = "sqlserverLoad"),
@JsonSubTypes.Type(value = TDSQLPostgresLoadNode.class, name = "tdsqlPostgresLoad"),
@JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad"),
+ @JsonSubTypes.Type(value = IcebergLoadNode.class, name = "icebergLoad"),
@JsonSubTypes.Type(value = ElasticsearchLoadNode.class, name = "elasticsearchLoad"),
@JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
@JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad")
@@ -98,13 +100,13 @@ public abstract class LoadNode implements Node {
@JsonCreator
public LoadNode(@JsonProperty("id") String id,
- @JsonProperty("name") String name,
- @JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
- @JsonProperty("filters") List<FilterFunction> filters,
- @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
- @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
- @Nullable @JsonProperty("properties") Map<String, String> properties) {
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+ @JsonProperty("filters") List<FilterFunction> filters,
+ @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+ @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+ @Nullable @JsonProperty("properties") Map<String, String> properties) {
this.id = Preconditions.checkNotNull(id, "id is null");
this.name = name;
this.fields = Preconditions.checkNotNull(fields, "fields is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
index 1a28cad2a..ff3355a99 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
@@ -51,8 +51,8 @@ public class HbaseLoadNode extends LoadNode implements Serializable {
@JsonProperty("tableName")
private String tableName;
- @JsonProperty("nameSpace")
- private String nameSpace;
+ @JsonProperty("namespace")
+ private String namespace;
@JsonProperty("zookeeperQuorum")
private String zookeeperQuorum;
@@ -82,7 +82,7 @@ public class HbaseLoadNode extends LoadNode implements Serializable {
@JsonProperty("sinkParallelism") Integer sinkParallelism,
@JsonProperty("properties") Map<String, String> properties,
@JsonProperty("tableName") String tableName,
- @JsonProperty("nameSpace") String nameSpace,
+ @JsonProperty("namespace") String namespace,
@JsonProperty("zookeeperQuorum") String zookeeperQuorum,
@JsonProperty("rowKey") String rowKey,
@JsonProperty("sinkBufferFlushMaxSize") String sinkBufferFlushMaxSize,
@@ -91,7 +91,7 @@ public class HbaseLoadNode extends LoadNode implements Serializable {
@JsonProperty("sinkBufferFlushInterval") String sinkBufferFlushInterval) {
super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.tableName = Preconditions.checkNotNull(tableName, "tableName of hbase is null");
- this.nameSpace = Preconditions.checkNotNull(nameSpace, "nameSpace of hbase is null");
+ this.namespace = Preconditions.checkNotNull(namespace, "namespace of hbase is null");
this.zookeeperQuorum = Preconditions.checkNotNull(zookeeperQuorum, "zookeeperQuorum of hbase is null");
this.rowKey = Preconditions.checkNotNull(rowKey, "rowKey of hbase is null");
this.sinkBufferFlushMaxSize = sinkBufferFlushMaxSize;
@@ -104,7 +104,7 @@ public class HbaseLoadNode extends LoadNode implements Serializable {
public Map<String, String> tableOptions() {
Map<String, String> map = super.tableOptions();
map.put(HBaseConstant.CONNECTOR, HBaseConstant.HBASE_2);
- map.put(HBaseConstant.TABLE_NAME, nameSpace + ":" + tableName);
+ map.put(HBaseConstant.TABLE_NAME, namespace + ":" + tableName);
map.put(HBaseConstant.ZOOKEEPER_QUORUM, zookeeperQuorum);
if (StringUtils.isNotEmpty(sinkBufferFlushInterval)) {
map.put(HBaseConstant.SINK_BUFFER_FLUSH_INTERVAL, sinkBufferFlushInterval);
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
index 9775ac297..d7a2ebb94 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
@@ -25,6 +25,9 @@ import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.Arrays;
+/**
+ * Test for {@link HbaseLoadNode}
+ */
public class HbaseLoadNodeTest extends SerializeBaseTest<HbaseLoadNode> {
@Override
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
similarity index 62%
copy from inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
index 9775ac297..bcddb2b2f 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNodeTest.java
@@ -21,18 +21,31 @@ package org.apache.inlong.sort.protocol.node.load;
import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.Arrays;
-public class HbaseLoadNodeTest extends SerializeBaseTest<HbaseLoadNode> {
+/**
+ * Test for {@link IcebergLoadNode}
+ */
+public class IcebergLoadNodeTest extends SerializeBaseTest<IcebergLoadNode> {
@Override
- public HbaseLoadNode getTestObject() {
- return new HbaseLoadNode("2", "test_hbase",
- Arrays.asList(new FieldInfo("cf:id", new StringFormatInfo())),
+ public IcebergLoadNode getTestObject() {
+ return new IcebergLoadNode("1", "test_iceberg",
+ Arrays.asList(new FieldInfo("id", new StringFormatInfo())),
Arrays.asList(new FieldRelation(new FieldInfo("id", new StringFormatInfo()),
- new FieldInfo("cf:id", new StringFormatInfo()))), null, null, 1, null, "mytable", "default",
- "localhost:2181", "MD5(`id`)", null, null, null, null);
+ new FieldInfo("id", new StringFormatInfo()))),
+ null,
+ null,
+ 1,
+ null,
+ "test_db",
+ "test_table",
+ "id",
+ CatalogType.HIVE,
+ "thrift://localhost:9083",
+ "hdfs://localhost:9000/user/iceberg/warehouse");
}
}