You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/11 07:38:33 UTC

[incubator-inlong] branch master updated: [INLONG-3883][Manager] Support creating tables for ClickHouse (#4004)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b97eee7d [INLONG-3883][Manager] Support creating tables for ClickHouse (#4004)
5b97eee7d is described below

commit 5b97eee7dfeca92a7ce0739b0ba6076594a0af34
Author: lucaspeng12138 <10...@users.noreply.github.com>
AuthorDate: Wed May 11 15:38:27 2022 +0800

    [INLONG-3883][Manager] Support creating tables for ClickHouse (#4004)
    
    * support creating tables for ClickHouse
    
    * change default Engine as Log
    
    * delete drop dp, drop table, drop column
    
    * solve code format and add doc for desc
    
    * Remove unused import
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../common/pojo/sink/ck/ClickHouseColumnInfo.java  |  34 ++++
 .../common/pojo/sink/ck/ClickHouseSinkDTO.java     |  31 ++++
 .../common/pojo/sink/ck/ClickHouseSinkRequest.java |  12 ++
 .../common/pojo/sink/ck/ClickHouseTableInfo.java   |  37 ++++
 inlong-manager/manager-dao/pom.xml                 |   4 +
 .../service/resource/ck/ClickHouseJdbcUtils.java   | 165 +++++++++++++++++
 .../resource/ck/ClickHouseResourceOperator.java    | 138 ++++++++++++++
 .../service/resource/ck/ClickHouseSqlBuilder.java  | 149 +++++++++++++++
 .../licenses/LICENSE-clickhouse-jdbc.txt           | 202 +++++++++++++++++++++
 9 files changed, 772 insertions(+)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseColumnInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseColumnInfo.java
new file mode 100644
index 000000000..ce46fac1d
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseColumnInfo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.ck;
+
+import lombok.Data;
+
+@Data
+public class ClickHouseColumnInfo {
+
+    private String name;
+    private String type;
+    private String desc;
+    private String defaultType;
+    private String defaultExpr;
+
+    private String compressionCode;
+
+    private String ttlExpr;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
index 2efecea8a..e18b8a92b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -78,6 +79,18 @@ public class ClickHouseSinkDTO {
     @ApiModelProperty("Key field names, separate with commas")
     private String keyFieldNames;
 
+    @ApiModelProperty("Table engine, support MergeTree Mem and so on")
+    private String engine;
+
+    @ApiModelProperty("Table Partiion information")
+    private String partitionBy;
+
+    @ApiModelProperty("Table order information")
+    private String orderBy;
+
+    @ApiModelProperty("Table primary key")
+    private String primaryKey;
+
     @ApiModelProperty("Properties for clickhouse")
     private Map<String, Object> properties;
 
@@ -98,6 +111,10 @@ public class ClickHouseSinkDTO {
                 .partitionStrategy(request.getPartitionStrategy())
                 .partitionFields(request.getPartitionFields())
                 .keyFieldNames(request.getKeyFieldNames())
+                .engine(request.getEngine())
+                .partitionBy(request.getPartitionBy())
+                .primaryKey(request.getPrimaryKey())
+                .orderBy(request.getOrderBy())
                 .properties(request.getProperties())
                 .build();
     }
@@ -111,4 +128,18 @@ public class ClickHouseSinkDTO {
         }
     }
 
+    public static ClickHouseTableInfo getClickHouseTableInfo(ClickHouseSinkDTO ckInfo,
+            List<ClickHouseColumnInfo> columnList) {
+        ClickHouseTableInfo tableInfo = new ClickHouseTableInfo();
+        tableInfo.setDbName(ckInfo.getDbName());
+        tableInfo.setTableName(ckInfo.getTableName());
+        tableInfo.setEngine(ckInfo.getEngine());
+        tableInfo.setOrderBy(ckInfo.getOrderBy());
+        tableInfo.setPartitionBy(ckInfo.getPartitionBy());
+        tableInfo.setPrimaryKey(ckInfo.getPrimaryKey());
+        tableInfo.setColumns(columnList);
+
+        return tableInfo;
+    }
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkRequest.java
index 0d35fcf91..f4ebdc31d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkRequest.java
@@ -73,4 +73,16 @@ public class ClickHouseSinkRequest extends SinkRequest {
     @ApiModelProperty("Key field names, separate with commas")
     private String keyFieldNames;
 
+    @ApiModelProperty("table engine, support MergeTree Mem and so on")
+    private String engine;
+
+    @ApiModelProperty("Table Partiion information")
+    private String partitionBy;
+
+    @ApiModelProperty("Table order information")
+    private String orderBy;
+
+    @ApiModelProperty("Table primary key")
+    private String primaryKey;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseTableInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseTableInfo.java
new file mode 100644
index 000000000..b26ba7b53
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseTableInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.ck;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class ClickHouseTableInfo {
+    // Basic attributes
+    private String dbName;
+    private String tableName;
+    private String tableDesc;
+
+    private String engine;
+    private String partitionBy;
+    private String orderBy;
+    private String primaryKey;
+
+    private List<ClickHouseColumnInfo> columns;
+}
diff --git a/inlong-manager/manager-dao/pom.xml b/inlong-manager/manager-dao/pom.xml
index 9cd40b868..75d200de8 100644
--- a/inlong-manager/manager-dao/pom.xml
+++ b/inlong-manager/manager-dao/pom.xml
@@ -233,6 +233,10 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>ru.yandex.clickhouse</groupId>
+            <artifactId>clickhouse-jdbc</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseJdbcUtils.java
new file mode 100644
index 000000000..e5a594ad9
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseJdbcUtils.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.ck;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseColumnInfo;
+import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.yandex.clickhouse.ClickHouseDatabaseMetadata;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utils for ClickHouse JDBC.
+ */
+public class ClickHouseJdbcUtils {
+
+    private static final String CLICKHOUSE_DRIVER_CLASS = "ru.yandex.clickhouse.ClickHouseDriver";
+    private static final String METADATA_TYPE = "TABLE";
+    private static final String COLUMN_LABEL = "TABLE_NAME";
+    private static final String CLICKHOUSE_JDBC_PREFIX = "jdbc:clickhouse";
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClickHouseJdbcUtils.class);
+
+    /**
+     * Get ClickHouse connection from clickhouse url and user
+     */
+    public static Connection getConnection(String url, String user, String password) throws Exception {
+        if (StringUtils.isBlank(url) || !url.startsWith(CLICKHOUSE_JDBC_PREFIX)) {
+            throw new Exception("ClickHouse server URL was invalid, it should start with jdbc:clickhouse");
+        }
+        Connection conn;
+        try {
+            Class.forName(CLICKHOUSE_DRIVER_CLASS);
+            conn = DriverManager.getConnection(url, user, password);
+        } catch (Exception e) {
+            LOG.error("get clickhouse connection error, please check clickhouse jdbc url, username or password", e);
+            throw new Exception("get clickhouse connection error, please check jdbc url, username or password. "
+                    + "other error msg: " + e.getMessage());
+        }
+
+        if (conn == null) {
+            throw new Exception("get clickhouse connection failed, please contact administrator");
+        }
+
+        LOG.info("get clickhouse connection success, url={}", url);
+        return conn;
+    }
+
+    /**
+     * Execute One ClickHouse Sql command
+     */
+    public static void executeSql(String sql, String url, String user, String password) throws Exception {
+        try (Connection conn = getConnection(url, user, password)) {
+            Statement stmt = conn.createStatement();
+            stmt.execute(sql);
+            LOG.info("execute sql [{}] success for url: {}", sql, url);
+        }
+    }
+
+    /**
+     * Execute Batch ClickHouse Sql commands
+     */
+    public static void executeSqlBatch(List<String> sql, String url, String user, String password) throws Exception {
+        try (Connection conn = getConnection(url, user, password)) {
+            Statement stmt = conn.createStatement();
+            for (String entry : sql) {
+                stmt.execute(entry);
+            }
+            LOG.info("execute sql [{}] success for url: {}", sql, url);
+        }
+    }
+
+    /**
+     * Create ClickHouse database
+     */
+    public static void createDb(String url, String user, String password, String dbName) throws Exception {
+        String createDbSql = ClickHouseSqlBuilder.buildCreateDbSql(dbName);
+        executeSql(createDbSql, url, user, password);
+    }
+
+    /**
+     * Create ClickHouse table
+     */
+    public static void createTable(String url, String user, String password,
+            ClickHouseTableInfo tableInfo) throws Exception {
+        String createTableSql = ClickHouseSqlBuilder.buildCreateTableSql(tableInfo);
+        ClickHouseJdbcUtils.executeSql(createTableSql, url, user, password);
+    }
+
+    /**
+     * Get ClickHouse tables from the ClickHouse metadata
+     */
+    public static List<String> getTables(String url, String user, String password, String dbname) throws Exception {
+        List<String> tables = new ArrayList<>();
+        try (Connection conn = getConnection(url, user, password)) {
+            ClickHouseDatabaseMetadata metaData = (ClickHouseDatabaseMetadata) conn.getMetaData();
+            LOG.info("dbname is {}", dbname);
+            ResultSet rs = metaData.getTables(dbname, dbname, null, new String[]{"TABLE"});
+            while (rs.next()) {
+                String tableName = rs.getString(COLUMN_LABEL);
+                tables.add(tableName);
+            }
+            rs.close();
+        }
+        return tables;
+    }
+
+    /**
+     * Query ClickHouse columns
+     */
+    public static List<ClickHouseColumnInfo> getColumns(String url, String user, String password, String dbName,
+            String tableName) throws Exception {
+
+        String querySql = ClickHouseSqlBuilder.buildDescTableSql(dbName, tableName);
+        try (Connection conn = getConnection(url, user, password);
+                Statement stmt = conn.createStatement();
+                ResultSet rs = stmt.executeQuery(querySql)) {
+            List<ClickHouseColumnInfo> columnList = new ArrayList<>();
+            while (rs.next()) {
+                ClickHouseColumnInfo columnInfo = new ClickHouseColumnInfo();
+                columnInfo.setName(rs.getString(1));
+                columnInfo.setType(rs.getString(2));
+                columnInfo.setDefaultType(rs.getString(3));
+                columnInfo.setDefaultExpr(rs.getString(4));
+                columnInfo.setDesc(rs.getString(5));
+                columnInfo.setCompressionCode(rs.getString(6));
+                columnInfo.setTtlExpr(rs.getString(7));
+                columnList.add(columnInfo);
+            }
+            return columnList;
+        }
+    }
+
+    /**
+     * Add columns for ClickHouse table
+     */
+    public static void addColumns(String url, String user, String password, String dbName, String tableName,
+            List<ClickHouseColumnInfo> columnList) throws Exception {
+        List<String> addColumnSql = ClickHouseSqlBuilder.buildAddColumnsSql(dbName, tableName, columnList);
+        ClickHouseJdbcUtils.executeSqlBatch(addColumnSql, url, user, password);
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseResourceOperator.java
new file mode 100644
index 000000000..a0e363cab
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseResourceOperator.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.ck;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseColumnInfo;
+import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseTableInfo;
+import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkDTO;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.service.resource.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+@Service
+public class ClickHouseResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseResourceOperator.class);
+    @Autowired
+    private StreamSinkService sinkService;
+    @Autowired
+    private StreamSinkFieldEntityMapper clickHouseFieldMapper;
+
+    @Override
+    public Boolean accept(SinkType sinkType) {
+        return SinkType.CLICKHOUSE == sinkType;
+    }
+
+    @Override
+    public void createSinkResource(SinkInfo sinkInfo) {
+        if (sinkInfo == null) {
+            LOGGER.warn("sink info was null, skip to create resource");
+            return;
+        }
+
+        if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+            LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
+            return;
+        } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+            LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
+            return;
+        }
+
+        this.createTable(sinkInfo);
+    }
+
+    private void createTable(SinkInfo sinkInfo) {
+        LOGGER.info("begin to create clickhouse table for sinkId={}", sinkInfo.getId());
+
+        List<StreamSinkFieldEntity> fieldList = clickHouseFieldMapper.selectBySinkId(sinkInfo.getId());
+        if (CollectionUtils.isEmpty(fieldList)) {
+            LOGGER.warn("no clickhouse fields found, skip to create table for sinkId={}", sinkInfo.getId());
+        }
+
+        // set columns
+        List<ClickHouseColumnInfo> columnList = new ArrayList<>();
+        for (StreamSinkFieldEntity field : fieldList) {
+            ClickHouseColumnInfo columnInfo = new ClickHouseColumnInfo();
+            columnInfo.setName(field.getFieldName());
+            columnInfo.setType(field.getFieldType());
+            columnInfo.setDesc(field.getFieldComment());
+            columnList.add(columnInfo);
+        }
+
+        try {
+            ClickHouseSinkDTO ckInfo = ClickHouseSinkDTO.getFromJson(sinkInfo.getExtParams());
+            ClickHouseTableInfo tableInfo = ClickHouseSinkDTO.getClickHouseTableInfo(ckInfo, columnList);
+            String url = ckInfo.getJdbcUrl();
+            String user = ckInfo.getUsername();
+            String password = ckInfo.getPassword();
+
+            String dbName = tableInfo.getDbName();
+            String tableName = tableInfo.getTableName();
+
+            // 1. create database if not exists
+            ClickHouseJdbcUtils.createDb(url, user, password, dbName);
+
+            // 2. check if the table exists
+            List<String> tables = ClickHouseJdbcUtils.getTables(url, user, password, dbName);
+            boolean tableExists = tables.contains(tableName);
+
+            // 3. table not exists, create it
+            if (!tableExists) {
+                ClickHouseJdbcUtils.createTable(url, user, password, tableInfo);
+            } else {
+                // 4. table exists, add columns - skip the exists columns
+                List<ClickHouseColumnInfo> existColumns = ClickHouseJdbcUtils.getColumns(url,
+                        user, password, dbName, tableName);
+                List<ClickHouseColumnInfo> needAddColumns = tableInfo.getColumns().stream()
+                        .skip(existColumns.size()).collect(toList());
+                if (CollectionUtils.isNotEmpty(needAddColumns)) {
+                    ClickHouseJdbcUtils.addColumns(url, user, password, dbName, tableName, needAddColumns);
+                }
+            }
+
+            // 5. update the sink status to success
+            String info = "success to create clickhouse resource";
+            sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+            LOGGER.info(info + " for sinkInfo={}", sinkInfo);
+        } catch (Throwable e) {
+            String errMsg = "create clickhouse table failed: " + e.getMessage();
+            LOGGER.error(errMsg, e);
+            sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+            throw new WorkflowException(errMsg);
+        }
+
+        LOGGER.info("success create ClickHouse table for data sind [" + sinkInfo.getId() + "]");
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseSqlBuilder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseSqlBuilder.java
new file mode 100644
index 000000000..5ab3f9f85
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseSqlBuilder.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.ck;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseColumnInfo;
+import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Builder for ClickHouse SQL string
+ */
+public class ClickHouseSqlBuilder {
+
+    private static final int FIRST_COLUMN_INDEX = 0;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSqlBuilder.class);
+
+    /**
+     * Build create database SQL
+     */
+    public static String buildCreateDbSql(String dbName) {
+        // Support _ beginning with underscore
+        String sql = "CREATE DATABASE IF NOT EXISTS " + dbName;
+        LOGGER.info("create db sql: {}", sql);
+        return sql;
+    }
+
+    /**
+     * Build create table SQL
+     */
+    public static String buildCreateTableSql(ClickHouseTableInfo table) {
+        StringBuilder sql = new StringBuilder();
+        // Support _ beginning with underscore
+        String dbTableName = table.getDbName() + "." + table.getTableName();
+        sql.append("CREATE TABLE ").append(dbTableName);
+
+        // Construct columns and partition columns
+        sql.append(buildCreateColumnsSql(table.getColumns()));
+        if (StringUtils.isNotEmpty(table.getEngine())) {
+            sql.append(" ENGINE = ").append(table.getEngine());
+        } else {
+            sql.append(" ENGINE = MergeTree()");
+        }
+        if (StringUtils.isNotEmpty(table.getOrderBy())) {
+            sql.append(" ORDER BY ").append(table.getOrderBy());
+        } else if (StringUtils.isEmpty(table.getEngine())) {
+            sql.append(" ORDER BY ").append(table.getColumns()
+                    .get(FIRST_COLUMN_INDEX).getName());
+        }
+        if (StringUtils.isNotEmpty(table.getPartitionBy())) {
+            sql.append(" PARTITION BY ").append(table.getPartitionBy());
+        }
+        if (StringUtils.isNotEmpty(table.getPrimaryKey())) {
+            sql.append(" PRIMARY KEY ").append(table.getPrimaryKey());
+        }
+        if (StringUtils.isNotEmpty(table.getTableDesc())) {
+            sql.append(" COMMENT '").append(table.getTableDesc()).append("'");
+        }
+
+        LOGGER.info("create table sql: {}", sql);
+        return sql.toString();
+    }
+
+    /**
+     * Build add column SQL
+     */
+    public static List<String> buildAddColumnsSql(String dbName, String tableName,
+            List<ClickHouseColumnInfo> columnList) {
+        String dbTableName = dbName + "." + tableName;
+        List<String> columnInfoList = getColumnsInfo(columnList);
+        List<String> resultList = new ArrayList<>();
+        for (String columnInfo : columnInfoList) {
+            StringBuilder sql = new StringBuilder().append("ALTER TABLE ")
+                    .append(dbTableName).append(" ADD COLUMN ").append(columnInfo);
+            resultList.add(sql.toString());
+        }
+        return resultList;
+    }
+
+    /**
+     * Build create column SQL
+     */
+    private static String buildCreateColumnsSql(List<ClickHouseColumnInfo> columns) {
+        List<String> columnList = getColumnsInfo(columns);
+        StringBuilder result = new StringBuilder().append(" (")
+                .append(StringUtils.join(columnList, ",")).append(") ");
+        return result.toString();
+    }
+
+    /**
+     * Build column info
+     */
+    private static List<String> getColumnsInfo(List<ClickHouseColumnInfo> columns) {
+        List<String> columnList = new ArrayList<>();
+        List<String> partitionList = new ArrayList<>();
+        for (ClickHouseColumnInfo columnInfo : columns) {
+            // Construct columns and partition columns
+            StringBuilder columnStr = new StringBuilder().append(columnInfo.getName())
+                    .append(" ").append(columnInfo.getType());
+            if (StringUtils.isNotEmpty(columnInfo.getDefaultType())) {
+                columnStr.append(" ").append(columnInfo.getDefaultType())
+                        .append(" ").append(columnInfo.getDefaultExpr());
+            }
+            if (StringUtils.isNotEmpty(columnInfo.getCompressionCode())) {
+                columnStr.append(" CODEC(").append(columnInfo.getDesc()).append(")");
+            }
+            if (StringUtils.isNotEmpty(columnInfo.getTtlExpr())) {
+                columnStr.append(" TTL ").append(columnInfo.getTtlExpr());
+            }
+            if (StringUtils.isNotEmpty(columnInfo.getDesc())) {
+                columnStr.append(" COMMENT '").append(columnInfo.getDesc()).append("'");
+            }
+            columnList.add(columnStr.toString());
+        }
+        return columnList;
+    }
+
+    /**
+     * Build query table SQL
+     */
+    public static String buildDescTableSql(String dbName, String tableName) {
+        StringBuilder sql = new StringBuilder();
+        String fullTableName = dbName + "." + tableName;
+        sql.append("DESC ").append(fullTableName);
+
+        LOGGER.info("desc table sql={}", sql);
+        return sql.toString();
+    }
+}
diff --git a/licenses/inlong-manager/licenses/LICENSE-clickhouse-jdbc.txt b/licenses/inlong-manager/licenses/LICENSE-clickhouse-jdbc.txt
new file mode 100644
index 000000000..d64569567
--- /dev/null
+++ b/licenses/inlong-manager/licenses/LICENSE-clickhouse-jdbc.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.