You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/19 07:00:36 UTC

[incubator-inlong] branch master updated: [INLONG-2579][Manager] Support data flow to ClickHouse (#2581)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 720f68c  [INLONG-2579][Manager] Support data flow to ClickHouse (#2581)
720f68c is described below

commit 720f68c8054bd96621412928884a733bd0d1d2ea
Author: chestnufang <65...@users.noreply.github.com>
AuthorDate: Sat Feb 19 15:00:31 2022 +0800

    [INLONG-2579][Manager] Support data flow to ClickHouse (#2581)
    
    * [INLONG-2579][Manager] Support data flow to ClickHouse
    
    * [INLONG-2579][Manager] To use storage enum
---
 .../inlong/manager/common/enums/BizConstant.java   |   6 +-
 .../inlong/manager/common/enums/StorageType.java   |   2 +-
 .../pojo/datastorage/ck/ClickHouseStorageDTO.java  | 107 +++++++++++++++++++++
 .../ck/ClickHouseStorageListResponse.java          |  70 ++++++++++++++
 .../datastorage/ck/ClickHouseStorageRequest.java   |  75 +++++++++++++++
 .../datastorage/ck/ClickHouseStorageResponse.java  |  75 +++++++++++++++
 .../ClickHouseStorageOperation.java}               |  73 +++++++-------
 .../storage/iceberg/IcebergStorageOperation.java   |   5 +-
 .../storage/kafka/KafkaStorageOperation.java       |   5 +-
 .../service/core/ClickHouseStorageServiceTest.java |  95 ++++++++++++++++++
 10 files changed, 473 insertions(+), 40 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
index a700f15..40cf69c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
@@ -26,6 +26,10 @@ public class BizConstant {
 
     public static final String STORAGE_KAFKA = "KAFKA";
 
+    public static final String STORAGE_CLICKHOUSE = "CLICKHOUSE";
+
+    public static final String STORAGE_ICEBERG = "ICEBERG";
+
     public static final String DATA_SOURCE_DB = "DB";
 
     public static final String DATA_SOURCE_FILE = "FILE";
@@ -78,6 +82,4 @@ public class BizConstant {
 
     public static final Integer DISABLE_CREATE_RESOURCE = 0; // Disable create resource
 
-    public static final String STORAGE_ICEBERG = "ICEBERG";
-
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StorageType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StorageType.java
index d691aa8..234972d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StorageType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StorageType.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.common.enums;
 import java.util.Locale;
 
 public enum StorageType {
-    HIVE, ES, CLICKHOUSE;
+    HIVE, ES, CLICKHOUSE, ICEBERG, KAFKA;
 
     @Override
     public String toString() {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageDTO.java
new file mode 100644
index 0000000..ce0492e
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageDTO.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.datastorage.ck;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+/**
+ * ClickHouse storage info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ClickHouseStorageDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @ApiModelProperty("ClickHouse JDBC URL")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Target database name")
+    private String databaseName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("User password")
+    private String password;
+
+    @ApiModelProperty("Whether distributed table")
+    private Boolean distributedTable;
+
+    @ApiModelProperty("Partition strategy,support: BALANCE, RANDOM, HASH")
+    private String partitionStrategy;
+
+    @ApiModelProperty("Partition key")
+    private String partitionKey;
+
+    @ApiModelProperty("Key field names")
+    private String[] keyFieldNames;
+
+    @ApiModelProperty("Flush interval")
+    private Integer flushInterval;
+
+    @ApiModelProperty("Flush record number")
+    private Integer flushRecordNumber;
+
+    @ApiModelProperty("Write max retry times")
+    private Integer writeMaxRetryTimes;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static ClickHouseStorageDTO getFromRequest(ClickHouseStorageRequest request) {
+        return ClickHouseStorageDTO.builder()
+                .jdbcUrl(request.getJdbcUrl())
+                .username(request.getUsername())
+                .password(request.getPassword())
+                .databaseName(request.getDatabaseName())
+                .tableName(request.getTableName())
+                .distributedTable(request.getDistributedTable())
+                .partitionStrategy(request.getPartitionStrategy())
+                .partitionKey(request.getPartitionKey())
+                .keyFieldNames(request.getKeyFieldNames())
+                .flushInterval(request.getFlushInterval())
+                .flushRecordNumber(request.getFlushRecordNumber())
+                .writeMaxRetryTimes(request.getWriteMaxRetryTimes())
+                .build();
+    }
+
+    public static ClickHouseStorageDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, ClickHouseStorageDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(BizErrorCodeEnum.STORAGE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageListResponse.java
new file mode 100644
index 0000000..982b7c4
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageListResponse.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.datastorage.ck;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageListResponse;
+
+/**
+ * Response of ClickHouse storage list
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of ClickHouse storage paging list")
+public class ClickHouseStorageListResponse extends StorageListResponse {
+
+    @ApiModelProperty("ClickHouse JDBC URL")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Target database name")
+    private String databaseName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("User password")
+    private String password;
+
+    @ApiModelProperty("Whether distributed table")
+    private Boolean distributedTable;
+
+    @ApiModelProperty("Partition strategy,support: BALANCE, RANDOM, HASH")
+    private String partitionStrategy;
+
+    @ApiModelProperty("Partition key")
+    private String partitionKey;
+
+    @ApiModelProperty("Key field names")
+    private String[] keyFieldNames;
+
+    @ApiModelProperty("Flush interval")
+    private Integer flushInterval;
+
+    @ApiModelProperty("Flush record number")
+    private Integer flushRecordNumber;
+
+    @ApiModelProperty("Write max retry times")
+    private Integer writeMaxRetryTimes;
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageRequest.java
new file mode 100644
index 0000000..8bf4258
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageRequest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.datastorage.ck;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of the ClickHouse storage info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the ClickHouse storage info")
+@JsonTypeDefine(value = BizConstant.STORAGE_CLICKHOUSE)
+public class ClickHouseStorageRequest extends StorageRequest {
+
+    @ApiModelProperty("ClickHouse JDBC URL")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Target database name")
+    private String databaseName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("User password")
+    private String password;
+
+    @ApiModelProperty("Whether distributed table")
+    private Boolean distributedTable;
+
+    @ApiModelProperty("Partition strategy,support: BALANCE, RANDOM, HASH")
+    private String partitionStrategy;
+
+    @ApiModelProperty("Partition key")
+    private String partitionKey;
+
+    @ApiModelProperty("Key field names")
+    private String[] keyFieldNames;
+
+    @ApiModelProperty("Flush interval")
+    private Integer flushInterval;
+
+    @ApiModelProperty("Flush record number")
+    private Integer flushRecordNumber;
+
+    @ApiModelProperty("Write max retry times")
+    private Integer writeMaxRetryTimes;
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageResponse.java
new file mode 100644
index 0000000..2776414
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageResponse.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.datastorage.ck;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageResponse;
+
+/**
+ * Response of the ClickHouse storage
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Response of the ClickHouse storage")
+public class ClickHouseStorageResponse extends StorageResponse {
+
+    private String storageType = BizConstant.STORAGE_CLICKHOUSE;
+
+    @ApiModelProperty("ClickHouse JDBC URL")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Target database name")
+    private String databaseName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("User password")
+    private String password;
+
+    @ApiModelProperty("Whether distributed table")
+    private Boolean distributedTable;
+
+    @ApiModelProperty("Partition strategy,support: BALANCE, RANDOM, HASH")
+    private String partitionStrategy;
+
+    @ApiModelProperty("Partition key")
+    private String partitionKey;
+
+    @ApiModelProperty("Key field names")
+    private String[] keyFieldNames;
+
+    @ApiModelProperty("Flush interval")
+    private Integer flushInterval;
+
+    @ApiModelProperty("Flush record number")
+    private Integer flushRecordNumber;
+
+    @ApiModelProperty("Write max retry times")
+    private Integer writeMaxRetryTimes;
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/ck/ClickHouseStorageOperation.java
similarity index 78%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java
copy to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/ck/ClickHouseStorageOperation.java
index c93b693..341e44e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/ck/ClickHouseStorageOperation.java
@@ -15,26 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.storage.iceberg;
+package org.apache.inlong.manager.service.storage.ck;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageInfo;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Supplier;
+import javax.validation.constraints.NotNull;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.StorageType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageFieldRequest;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageFieldResponse;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageListResponse;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageRequest;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageResponse;
-import org.apache.inlong.manager.common.pojo.datastorage.iceberg.IcebergStorageDTO;
-import org.apache.inlong.manager.common.pojo.datastorage.iceberg.IcebergStorageListResponse;
-import org.apache.inlong.manager.common.pojo.datastorage.iceberg.IcebergStorageRequest;
-import org.apache.inlong.manager.common.pojo.datastorage.iceberg.IcebergStorageResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.ck.ClickHouseStorageDTO;
+import org.apache.inlong.manager.common.pojo.datastorage.ck.ClickHouseStorageListResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.ck.ClickHouseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.ck.ClickHouseStorageResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StorageEntity;
@@ -47,15 +53,13 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.function.Supplier;
-
+/**
+ * ClickHouse storage operation
+ */
 @Service
-public class IcebergStorageOperation implements StorageOperation {
+public class ClickHouseStorageOperation implements StorageOperation {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(IcebergStorageOperation.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseStorageOperation.class);
 
     @Autowired
     private ObjectMapper objectMapper;
@@ -65,18 +69,18 @@ public class IcebergStorageOperation implements StorageOperation {
     private StorageFieldEntityMapper storageFieldMapper;
 
     @Override
-    public Boolean accept(String storageType) {
-        return BizConstant.STORAGE_ICEBERG.equals(storageType);
+    public Boolean accept(StorageType storageType) {
+        return StorageType.CLICKHOUSE.equals(storageType);
     }
 
     @Override
     public Integer saveOpt(StorageRequest request, String operator) {
         String storageType = request.getStorageType();
-        Preconditions.checkTrue(BizConstant.STORAGE_ICEBERG.equals(storageType),
+        Preconditions.checkTrue(BizConstant.STORAGE_CLICKHOUSE.equals(storageType),
                 BizErrorCodeEnum.STORAGE_TYPE_NOT_SUPPORT.getMessage() + ": " + storageType);
 
-        IcebergStorageRequest icebergStorageRequest = (IcebergStorageRequest) request;
-        StorageEntity entity = CommonBeanUtils.copyProperties(icebergStorageRequest, StorageEntity::new);
+        ClickHouseStorageRequest clickHouseStorageRequest = (ClickHouseStorageRequest) request;
+        StorageEntity entity = CommonBeanUtils.copyProperties(clickHouseStorageRequest, StorageEntity::new);
         entity.setStatus(EntityStatus.DATA_STORAGE_NEW.getCode());
         entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
         entity.setCreator(operator);
@@ -86,16 +90,18 @@ public class IcebergStorageOperation implements StorageOperation {
         entity.setModifyTime(now);
 
         // get the ext params
-        IcebergStorageDTO dto = IcebergStorageDTO.getFromRequest(icebergStorageRequest);
+        ClickHouseStorageDTO dto = ClickHouseStorageDTO.getFromRequest(clickHouseStorageRequest);
         try {
             entity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
             throw new BusinessException(BizErrorCodeEnum.STORAGE_SAVE_FAILED);
         }
         storageMapper.insert(entity);
+
         Integer storageId = entity.getId();
         request.setId(storageId);
         this.saveFieldOpt(request);
+
         return storageId;
     }
 
@@ -127,18 +133,18 @@ public class IcebergStorageOperation implements StorageOperation {
         }
 
         storageFieldMapper.insertAll(entityList);
-        LOGGER.info("success to save hive field");
+        LOGGER.info("success to save clickHouse field");
     }
 
     @Override
-    public StorageResponse getById(String storageType, Integer id) {
+    public StorageResponse getById(@NotNull String storageType, @NotNull Integer id) {
         StorageEntity entity = storageMapper.selectByPrimaryKey(id);
         Preconditions.checkNotNull(entity, BizErrorCodeEnum.STORAGE_INFO_NOT_FOUND.getMessage());
         String existType = entity.getStorageType();
-        Preconditions.checkTrue(BizConstant.STORAGE_ICEBERG.equals(existType),
-                String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_ICEBERG, existType));
+        Preconditions.checkTrue(BizConstant.STORAGE_CLICKHOUSE.equals(existType),
+                String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_CLICKHOUSE, existType));
 
-        StorageResponse response = this.getFromEntity(entity, IcebergStorageResponse::new);
+        StorageResponse response = this.getFromEntity(entity, ClickHouseStorageResponse::new);
         List<StorageFieldEntity> entities = storageFieldMapper.selectByStorageId(id);
         List<StorageFieldResponse> infos = CommonBeanUtils.copyListProperties(entities,
                 StorageFieldResponse::new);
@@ -155,10 +161,10 @@ public class IcebergStorageOperation implements StorageOperation {
         }
 
         String existType = entity.getStorageType();
-        Preconditions.checkTrue(BizConstant.STORAGE_ICEBERG.equals(existType),
-                String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_ICEBERG, existType));
+        Preconditions.checkTrue(BizConstant.STORAGE_CLICKHOUSE.equals(existType),
+                String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_CLICKHOUSE, existType));
 
-        IcebergStorageDTO dto = IcebergStorageDTO.getFromJson(entity.getExtParams());
+        ClickHouseStorageDTO dto = ClickHouseStorageDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
         CommonBeanUtils.copyProperties(dto, result, true);
 
@@ -170,21 +176,21 @@ public class IcebergStorageOperation implements StorageOperation {
         if (CollectionUtils.isEmpty(entityPage)) {
             return new PageInfo<>();
         }
-        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, IcebergStorageListResponse::new));
+        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, ClickHouseStorageListResponse::new));
     }
 
     @Override
     public void updateOpt(StorageRequest request, String operator) {
         String storageType = request.getStorageType();
-        Preconditions.checkTrue(BizConstant.STORAGE_ICEBERG.equals(storageType),
-                String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_ICEBERG, storageType));
+        Preconditions.checkTrue(BizConstant.STORAGE_CLICKHOUSE.equals(storageType),
+                String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_CLICKHOUSE, storageType));
 
         StorageEntity entity = storageMapper.selectByPrimaryKey(request.getId());
         Preconditions.checkNotNull(entity, BizErrorCodeEnum.STORAGE_INFO_NOT_FOUND.getMessage());
-        IcebergStorageRequest icebergStorageRequest = (IcebergStorageRequest) request;
-        CommonBeanUtils.copyProperties(icebergStorageRequest, entity, true);
+        ClickHouseStorageRequest clickHouseStorageRequest = (ClickHouseStorageRequest) request;
+        CommonBeanUtils.copyProperties(clickHouseStorageRequest, entity, true);
         try {
-            IcebergStorageDTO dto = IcebergStorageDTO.getFromRequest(icebergStorageRequest);
+            ClickHouseStorageDTO dto = ClickHouseStorageDTO.getFromRequest(clickHouseStorageRequest);
             entity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
             throw new BusinessException(BizErrorCodeEnum.STORAGE_INFO_INCORRECT.getMessage());
@@ -197,7 +203,7 @@ public class IcebergStorageOperation implements StorageOperation {
         storageMapper.updateByPrimaryKeySelective(entity);
 
         boolean onlyAdd = EntityStatus.DATA_STORAGE_CONFIG_SUCCESSFUL.getCode().equals(entity.getPreviousStatus());
-        this.updateFieldOpt(onlyAdd, icebergStorageRequest);
+        this.updateFieldOpt(onlyAdd, clickHouseStorageRequest);
 
         LOGGER.info("success to update storage of type={}", storageType);
     }
@@ -229,4 +235,5 @@ public class IcebergStorageOperation implements StorageOperation {
 
         LOGGER.info("success to update field");
     }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java
index c93b693..5549eea 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.StorageType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageFieldRequest;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageFieldResponse;
@@ -65,8 +66,8 @@ public class IcebergStorageOperation implements StorageOperation {
     private StorageFieldEntityMapper storageFieldMapper;
 
     @Override
-    public Boolean accept(String storageType) {
-        return BizConstant.STORAGE_ICEBERG.equals(storageType);
+    public Boolean accept(StorageType storageType) {
+        return StorageType.ICEBERG.equals(storageType);
     }
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/kafka/KafkaStorageOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/kafka/KafkaStorageOperation.java
index 4b6d795..04d6f79 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/kafka/KafkaStorageOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/kafka/KafkaStorageOperation.java
@@ -30,6 +30,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.StorageType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageFieldRequest;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageListResponse;
@@ -67,8 +68,8 @@ public class KafkaStorageOperation implements StorageOperation {
     private StorageFieldEntityMapper storageFieldMapper;
 
     @Override
-    public Boolean accept(String storageType) {
-        return BizConstant.STORAGE_KAFKA.equals(storageType);
+    public Boolean accept(StorageType storageType) {
+        return StorageType.KAFKA.equals(storageType);
     }
 
     @Override
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ClickHouseStorageServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ClickHouseStorageServiceTest.java
new file mode 100644
index 0000000..9cf2c3e
--- /dev/null
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ClickHouseStorageServiceTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.ck.ClickHouseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.ck.ClickHouseStorageResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.service.storage.StorageService;
+import org.apache.inlong.manager.web.WebBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Data storage service test
+ */
+public class ClickHouseStorageServiceTest extends WebBaseTest {
+
+    @Autowired
+    private StorageService storageService;
+    @Autowired
+    private DataStreamServiceTest streamServiceTest;
+
+    // Partial test data
+    private static final String globalGroupId = "b_group1";
+    private static final String globalStreamId = "stream1";
+    private static final String globalOperator = "test_user";
+    private static final String ckJdbcUrl = "jdbc:clickhouse://127.0.0.1:8123/default";
+    private static final String ckUsername = "ck_user";
+    private static final String ckDatabaseName = "ck_db";
+    private static final String ckTableName = "ck_tbl";
+
+    private static Integer ckStorageId;
+
+    @Before
+    public void saveStorage() {
+        streamServiceTest.saveDataStream(globalGroupId, globalStreamId, globalOperator);
+        ClickHouseStorageRequest storageInfo = new ClickHouseStorageRequest();
+        storageInfo.setInlongGroupId(globalGroupId);
+        storageInfo.setInlongStreamId(globalStreamId);
+        storageInfo.setStorageType(BizConstant.STORAGE_CLICKHOUSE);
+        storageInfo.setJdbcUrl(ckJdbcUrl);
+        storageInfo.setUsername(ckUsername);
+        storageInfo.setDatabaseName(ckDatabaseName);
+        storageInfo.setTableName(ckTableName);
+        storageInfo.setEnableCreateResource(BizConstant.DISABLE_CREATE_RESOURCE);
+        ckStorageId = storageService.save(storageInfo, globalOperator);
+    }
+
+    @After
+    public void deleteKafkaStorage() {
+        boolean result = storageService.delete(ckStorageId, BizConstant.STORAGE_CLICKHOUSE, globalOperator);
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void testListByIdentifier() {
+        StorageResponse storage = storageService.get(ckStorageId, BizConstant.STORAGE_CLICKHOUSE);
+        Assert.assertEquals(globalGroupId, storage.getInlongGroupId());
+    }
+
+    @Test
+    public void testGetAndUpdate() {
+        StorageResponse response = storageService.get(ckStorageId, BizConstant.STORAGE_CLICKHOUSE);
+        Assert.assertEquals(globalGroupId, response.getInlongGroupId());
+
+        ClickHouseStorageResponse kafkaStorageResponse = (ClickHouseStorageResponse) response;
+        kafkaStorageResponse.setEnableCreateResource(BizConstant.ENABLE_CREATE_RESOURCE);
+
+        ClickHouseStorageRequest request = CommonBeanUtils
+                .copyProperties(kafkaStorageResponse, ClickHouseStorageRequest::new);
+        boolean result = storageService.update(request, globalOperator);
+        Assert.assertTrue(result);
+    }
+
+}