You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/14 01:41:27 UTC

[inlong] branch master updated: [INLONG-7844][Manager] Support to set cluster when create table for clickhouse (#7845)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3977a96ac [INLONG-7844][Manager] Support to set cluster when create table for clickhouse (#7845)
3977a96ac is described below

commit 3977a96ac9719a75af6a2acbc82f36a30345ddea
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Fri Apr 14 09:41:20 2023 +0800

    [INLONG-7844][Manager] Support to set cluster when create table for clickhouse (#7845)
---
 .../java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java  | 3 +++
 .../org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java    | 5 +++++
 .../apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java    | 3 +++
 .../org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java  | 1 +
 .../manager/service/resource/sink/ck/ClickHouseSqlBuilder.java       | 4 +++-
 5 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java
index e36a907e4..4f3942a1a 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java
@@ -93,6 +93,9 @@ public class ClickHouseSink extends StreamSink {
     @ApiModelProperty(value = "The unit of message's time-to-live duration")
     private String ttlUnit;
 
+    @ApiModelProperty(value = "The cluster is used to create distributed tables in each node")
+    private String cluster;
+
     @ApiModelProperty("Table primary key")
     private String primaryKey;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
index 161060c65..0e133e33c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
@@ -94,6 +94,9 @@ public class ClickHouseSinkDTO {
     @ApiModelProperty(value = "The unit of message's time-to-live duration")
     private String ttlUnit;
 
+    @ApiModelProperty(value = "The cluster is used to create distributed tables in each node")
+    private String cluster;
+
     @ApiModelProperty("Table primary key")
     private String primaryKey;
 
@@ -130,6 +133,7 @@ public class ClickHouseSinkDTO {
                 .partitionBy(request.getPartitionBy())
                 .ttl(request.getTtl())
                 .ttlUnit(request.getTtlUnit())
+                .cluster(request.getCluster())
                 .primaryKey(request.getPrimaryKey())
                 .orderBy(request.getOrderBy())
                 .encryptVersion(encryptVersion)
@@ -158,6 +162,7 @@ public class ClickHouseSinkDTO {
         tableInfo.setPrimaryKey(ckInfo.getPrimaryKey());
         tableInfo.setTtl(ckInfo.getTtl());
         tableInfo.setTtlUnit(ckInfo.getTtlUnit());
+        tableInfo.setCluster(ckInfo.getCluster());
         tableInfo.setFieldInfoList(fieldInfoList);
 
         return tableInfo;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
index 93430e109..e96adee74 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
@@ -90,6 +90,9 @@ public class ClickHouseSinkRequest extends SinkRequest {
     @ApiModelProperty(value = "The unit of message's time-to-live duration")
     private String ttlUnit;
 
+    @ApiModelProperty(value = "The cluster is used to create distributed tables in each node")
+    private String cluster;
+
     @ApiModelProperty("Table primary key")
     private String primaryKey;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java
index db7a06ac1..c6f8d9729 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java
@@ -38,6 +38,7 @@ public class ClickHouseTableInfo {
     private String primaryKey;
     private Integer ttl;
     private String ttlUnit;
+    private String cluster;
 
     private List<ClickHouseFieldInfo> fieldInfoList;
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseSqlBuilder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseSqlBuilder.java
index 575232620..d8f38eda7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseSqlBuilder.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseSqlBuilder.java
@@ -54,7 +54,9 @@ public class ClickHouseSqlBuilder {
         // Support _ beginning with underscore
         String dbTableName = table.getDbName() + "." + table.getTableName();
         sql.append("CREATE TABLE ").append(dbTableName);
-
+        if (StringUtils.isNotBlank(table.getCluster())) {
+            sql.append(" ON CLUSTER ").append(table.getCluster());
+        }
         // add ttl columns
         if (table.getTtl() != null && StringUtils.isNotBlank(table.getTtlUnit())) {
             ClickHouseFieldInfo clickHouseFieldInfo = new ClickHouseFieldInfo();