You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/19 07:00:36 UTC
[incubator-inlong] branch master updated: [INLONG-2579][Manager] Support data flow to ClickHouse (#2581)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 720f68c [INLONG-2579][Manager] Support data flow to ClickHouse (#2581)
720f68c is described below
commit 720f68c8054bd96621412928884a733bd0d1d2ea
Author: chestnufang <65...@users.noreply.github.com>
AuthorDate: Sat Feb 19 15:00:31 2022 +0800
[INLONG-2579][Manager] Support data flow to ClickHouse (#2581)
* [INLONG-2579][Manager] Support data flow to ClickHouse
* [INLONG-2579][Manager] To use storage enum
---
.../inlong/manager/common/enums/BizConstant.java | 6 +-
.../inlong/manager/common/enums/StorageType.java | 2 +-
.../pojo/datastorage/ck/ClickHouseStorageDTO.java | 107 +++++++++++++++++++++
.../ck/ClickHouseStorageListResponse.java | 70 ++++++++++++++
.../datastorage/ck/ClickHouseStorageRequest.java | 75 +++++++++++++++
.../datastorage/ck/ClickHouseStorageResponse.java | 75 +++++++++++++++
.../ClickHouseStorageOperation.java} | 73 +++++++-------
.../storage/iceberg/IcebergStorageOperation.java | 5 +-
.../storage/kafka/KafkaStorageOperation.java | 5 +-
.../service/core/ClickHouseStorageServiceTest.java | 95 ++++++++++++++++++
10 files changed, 473 insertions(+), 40 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
index a700f15..40cf69c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
@@ -26,6 +26,10 @@ public class BizConstant {
public static final String STORAGE_KAFKA = "KAFKA";
+ public static final String STORAGE_CLICKHOUSE = "CLICKHOUSE";
+
+ public static final String STORAGE_ICEBERG = "ICEBERG";
+
public static final String DATA_SOURCE_DB = "DB";
public static final String DATA_SOURCE_FILE = "FILE";
@@ -78,6 +82,4 @@ public class BizConstant {
public static final Integer DISABLE_CREATE_RESOURCE = 0; // Disable create resource
- public static final String STORAGE_ICEBERG = "ICEBERG";
-
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StorageType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StorageType.java
index d691aa8..234972d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StorageType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StorageType.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.common.enums;
import java.util.Locale;
public enum StorageType {
- HIVE, ES, CLICKHOUSE;
+ HIVE, ES, CLICKHOUSE, ICEBERG, KAFKA;
@Override
public String toString() {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageDTO.java
new file mode 100644
index 0000000..ce0492e
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageDTO.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.datastorage.ck;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+/**
+ * ClickHouse storage info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ClickHouseStorageDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @ApiModelProperty("ClickHouse JDBC URL")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Target database name")
+ private String databaseName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Whether distributed table")
+ private Boolean distributedTable;
+
+ @ApiModelProperty("Partition strategy,support: BALANCE, RANDOM, HASH")
+ private String partitionStrategy;
+
+ @ApiModelProperty("Partition key")
+ private String partitionKey;
+
+ @ApiModelProperty("Key field names")
+ private String[] keyFieldNames;
+
+ @ApiModelProperty("Flush interval")
+ private Integer flushInterval;
+
+ @ApiModelProperty("Flush record number")
+ private Integer flushRecordNumber;
+
+ @ApiModelProperty("Write max retry times")
+ private Integer writeMaxRetryTimes;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static ClickHouseStorageDTO getFromRequest(ClickHouseStorageRequest request) {
+ return ClickHouseStorageDTO.builder()
+ .jdbcUrl(request.getJdbcUrl())
+ .username(request.getUsername())
+ .password(request.getPassword())
+ .databaseName(request.getDatabaseName())
+ .tableName(request.getTableName())
+ .distributedTable(request.getDistributedTable())
+ .partitionStrategy(request.getPartitionStrategy())
+ .partitionKey(request.getPartitionKey())
+ .keyFieldNames(request.getKeyFieldNames())
+ .flushInterval(request.getFlushInterval())
+ .flushRecordNumber(request.getFlushRecordNumber())
+ .writeMaxRetryTimes(request.getWriteMaxRetryTimes())
+ .build();
+ }
+
+ public static ClickHouseStorageDTO getFromJson(@NotNull String extParams) {
+ try {
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return OBJECT_MAPPER.readValue(extParams, ClickHouseStorageDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(BizErrorCodeEnum.STORAGE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageListResponse.java
new file mode 100644
index 0000000..982b7c4
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageListResponse.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.datastorage.ck;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageListResponse;
+
+/**
+ * Response of ClickHouse storage list
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of ClickHouse storage paging list")
+public class ClickHouseStorageListResponse extends StorageListResponse {
+
+ @ApiModelProperty("ClickHouse JDBC URL")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Target database name")
+ private String databaseName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Whether distributed table")
+ private Boolean distributedTable;
+
+ @ApiModelProperty("Partition strategy,support: BALANCE, RANDOM, HASH")
+ private String partitionStrategy;
+
+ @ApiModelProperty("Partition key")
+ private String partitionKey;
+
+ @ApiModelProperty("Key field names")
+ private String[] keyFieldNames;
+
+ @ApiModelProperty("Flush interval")
+ private Integer flushInterval;
+
+ @ApiModelProperty("Flush record number")
+ private Integer flushRecordNumber;
+
+ @ApiModelProperty("Write max retry times")
+ private Integer writeMaxRetryTimes;
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageRequest.java
new file mode 100644
index 0000000..8bf4258
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageRequest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.datastorage.ck;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of the ClickHouse storage info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the ClickHouse storage info")
+@JsonTypeDefine(value = BizConstant.STORAGE_CLICKHOUSE)
+public class ClickHouseStorageRequest extends StorageRequest {
+
+ @ApiModelProperty("ClickHouse JDBC URL")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Target database name")
+ private String databaseName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Whether distributed table")
+ private Boolean distributedTable;
+
+ @ApiModelProperty("Partition strategy,support: BALANCE, RANDOM, HASH")
+ private String partitionStrategy;
+
+ @ApiModelProperty("Partition key")
+ private String partitionKey;
+
+ @ApiModelProperty("Key field names")
+ private String[] keyFieldNames;
+
+ @ApiModelProperty("Flush interval")
+ private Integer flushInterval;
+
+ @ApiModelProperty("Flush record number")
+ private Integer flushRecordNumber;
+
+ @ApiModelProperty("Write max retry times")
+ private Integer writeMaxRetryTimes;
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageResponse.java
new file mode 100644
index 0000000..2776414
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/ck/ClickHouseStorageResponse.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.datastorage.ck;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageResponse;
+
+/**
+ * Response of the ClickHouse storage
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Response of the ClickHouse storage")
+public class ClickHouseStorageResponse extends StorageResponse {
+
+ private String storageType = BizConstant.STORAGE_CLICKHOUSE;
+
+ @ApiModelProperty("ClickHouse JDBC URL")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Target database name")
+ private String databaseName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Whether distributed table")
+ private Boolean distributedTable;
+
+ @ApiModelProperty("Partition strategy,support: BALANCE, RANDOM, HASH")
+ private String partitionStrategy;
+
+ @ApiModelProperty("Partition key")
+ private String partitionKey;
+
+ @ApiModelProperty("Key field names")
+ private String[] keyFieldNames;
+
+ @ApiModelProperty("Flush interval")
+ private Integer flushInterval;
+
+ @ApiModelProperty("Flush record number")
+ private Integer flushRecordNumber;
+
+ @ApiModelProperty("Write max retry times")
+ private Integer writeMaxRetryTimes;
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/ck/ClickHouseStorageOperation.java
similarity index 78%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java
copy to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/ck/ClickHouseStorageOperation.java
index c93b693..341e44e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/ck/ClickHouseStorageOperation.java
@@ -15,26 +15,32 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.storage.iceberg;
+package org.apache.inlong.manager.service.storage.ck;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageInfo;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Supplier;
+import javax.validation.constraints.NotNull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.StorageType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.datastorage.StorageFieldRequest;
import org.apache.inlong.manager.common.pojo.datastorage.StorageFieldResponse;
import org.apache.inlong.manager.common.pojo.datastorage.StorageListResponse;
import org.apache.inlong.manager.common.pojo.datastorage.StorageRequest;
import org.apache.inlong.manager.common.pojo.datastorage.StorageResponse;
-import org.apache.inlong.manager.common.pojo.datastorage.iceberg.IcebergStorageDTO;
-import org.apache.inlong.manager.common.pojo.datastorage.iceberg.IcebergStorageListResponse;
-import org.apache.inlong.manager.common.pojo.datastorage.iceberg.IcebergStorageRequest;
-import org.apache.inlong.manager.common.pojo.datastorage.iceberg.IcebergStorageResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.ck.ClickHouseStorageDTO;
+import org.apache.inlong.manager.common.pojo.datastorage.ck.ClickHouseStorageListResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.ck.ClickHouseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.ck.ClickHouseStorageResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StorageEntity;
@@ -47,15 +53,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.function.Supplier;
-
+/**
+ * ClickHouse storage operation
+ */
@Service
-public class IcebergStorageOperation implements StorageOperation {
+public class ClickHouseStorageOperation implements StorageOperation {
- private static final Logger LOGGER = LoggerFactory.getLogger(IcebergStorageOperation.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseStorageOperation.class);
@Autowired
private ObjectMapper objectMapper;
@@ -65,18 +69,18 @@ public class IcebergStorageOperation implements StorageOperation {
private StorageFieldEntityMapper storageFieldMapper;
@Override
- public Boolean accept(String storageType) {
- return BizConstant.STORAGE_ICEBERG.equals(storageType);
+ public Boolean accept(StorageType storageType) {
+ return StorageType.CLICKHOUSE.equals(storageType);
}
@Override
public Integer saveOpt(StorageRequest request, String operator) {
String storageType = request.getStorageType();
- Preconditions.checkTrue(BizConstant.STORAGE_ICEBERG.equals(storageType),
+ Preconditions.checkTrue(BizConstant.STORAGE_CLICKHOUSE.equals(storageType),
BizErrorCodeEnum.STORAGE_TYPE_NOT_SUPPORT.getMessage() + ": " + storageType);
- IcebergStorageRequest icebergStorageRequest = (IcebergStorageRequest) request;
- StorageEntity entity = CommonBeanUtils.copyProperties(icebergStorageRequest, StorageEntity::new);
+ ClickHouseStorageRequest clickHouseStorageRequest = (ClickHouseStorageRequest) request;
+ StorageEntity entity = CommonBeanUtils.copyProperties(clickHouseStorageRequest, StorageEntity::new);
entity.setStatus(EntityStatus.DATA_STORAGE_NEW.getCode());
entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
entity.setCreator(operator);
@@ -86,16 +90,18 @@ public class IcebergStorageOperation implements StorageOperation {
entity.setModifyTime(now);
// get the ext params
- IcebergStorageDTO dto = IcebergStorageDTO.getFromRequest(icebergStorageRequest);
+ ClickHouseStorageDTO dto = ClickHouseStorageDTO.getFromRequest(clickHouseStorageRequest);
try {
entity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(BizErrorCodeEnum.STORAGE_SAVE_FAILED);
}
storageMapper.insert(entity);
+
Integer storageId = entity.getId();
request.setId(storageId);
this.saveFieldOpt(request);
+
return storageId;
}
@@ -127,18 +133,18 @@ public class IcebergStorageOperation implements StorageOperation {
}
storageFieldMapper.insertAll(entityList);
- LOGGER.info("success to save hive field");
+ LOGGER.info("success to save clickHouse field");
}
@Override
- public StorageResponse getById(String storageType, Integer id) {
+ public StorageResponse getById(@NotNull String storageType, @NotNull Integer id) {
StorageEntity entity = storageMapper.selectByPrimaryKey(id);
Preconditions.checkNotNull(entity, BizErrorCodeEnum.STORAGE_INFO_NOT_FOUND.getMessage());
String existType = entity.getStorageType();
- Preconditions.checkTrue(BizConstant.STORAGE_ICEBERG.equals(existType),
- String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_ICEBERG, existType));
+ Preconditions.checkTrue(BizConstant.STORAGE_CLICKHOUSE.equals(existType),
+ String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_CLICKHOUSE, existType));
- StorageResponse response = this.getFromEntity(entity, IcebergStorageResponse::new);
+ StorageResponse response = this.getFromEntity(entity, ClickHouseStorageResponse::new);
List<StorageFieldEntity> entities = storageFieldMapper.selectByStorageId(id);
List<StorageFieldResponse> infos = CommonBeanUtils.copyListProperties(entities,
StorageFieldResponse::new);
@@ -155,10 +161,10 @@ public class IcebergStorageOperation implements StorageOperation {
}
String existType = entity.getStorageType();
- Preconditions.checkTrue(BizConstant.STORAGE_ICEBERG.equals(existType),
- String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_ICEBERG, existType));
+ Preconditions.checkTrue(BizConstant.STORAGE_CLICKHOUSE.equals(existType),
+ String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_CLICKHOUSE, existType));
- IcebergStorageDTO dto = IcebergStorageDTO.getFromJson(entity.getExtParams());
+ ClickHouseStorageDTO dto = ClickHouseStorageDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
CommonBeanUtils.copyProperties(dto, result, true);
@@ -170,21 +176,21 @@ public class IcebergStorageOperation implements StorageOperation {
if (CollectionUtils.isEmpty(entityPage)) {
return new PageInfo<>();
}
- return entityPage.toPageInfo(entity -> this.getFromEntity(entity, IcebergStorageListResponse::new));
+ return entityPage.toPageInfo(entity -> this.getFromEntity(entity, ClickHouseStorageListResponse::new));
}
@Override
public void updateOpt(StorageRequest request, String operator) {
String storageType = request.getStorageType();
- Preconditions.checkTrue(BizConstant.STORAGE_ICEBERG.equals(storageType),
- String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_ICEBERG, storageType));
+ Preconditions.checkTrue(BizConstant.STORAGE_CLICKHOUSE.equals(storageType),
+ String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_CLICKHOUSE, storageType));
StorageEntity entity = storageMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, BizErrorCodeEnum.STORAGE_INFO_NOT_FOUND.getMessage());
- IcebergStorageRequest icebergStorageRequest = (IcebergStorageRequest) request;
- CommonBeanUtils.copyProperties(icebergStorageRequest, entity, true);
+ ClickHouseStorageRequest clickHouseStorageRequest = (ClickHouseStorageRequest) request;
+ CommonBeanUtils.copyProperties(clickHouseStorageRequest, entity, true);
try {
- IcebergStorageDTO dto = IcebergStorageDTO.getFromRequest(icebergStorageRequest);
+ ClickHouseStorageDTO dto = ClickHouseStorageDTO.getFromRequest(clickHouseStorageRequest);
entity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(BizErrorCodeEnum.STORAGE_INFO_INCORRECT.getMessage());
@@ -197,7 +203,7 @@ public class IcebergStorageOperation implements StorageOperation {
storageMapper.updateByPrimaryKeySelective(entity);
boolean onlyAdd = EntityStatus.DATA_STORAGE_CONFIG_SUCCESSFUL.getCode().equals(entity.getPreviousStatus());
- this.updateFieldOpt(onlyAdd, icebergStorageRequest);
+ this.updateFieldOpt(onlyAdd, clickHouseStorageRequest);
LOGGER.info("success to update storage of type={}", storageType);
}
@@ -229,4 +235,5 @@ public class IcebergStorageOperation implements StorageOperation {
LOGGER.info("success to update field");
}
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java
index c93b693..5549eea 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/iceberg/IcebergStorageOperation.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.StorageType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.datastorage.StorageFieldRequest;
import org.apache.inlong.manager.common.pojo.datastorage.StorageFieldResponse;
@@ -65,8 +66,8 @@ public class IcebergStorageOperation implements StorageOperation {
private StorageFieldEntityMapper storageFieldMapper;
@Override
- public Boolean accept(String storageType) {
- return BizConstant.STORAGE_ICEBERG.equals(storageType);
+ public Boolean accept(StorageType storageType) {
+ return StorageType.ICEBERG.equals(storageType);
}
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/kafka/KafkaStorageOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/kafka/KafkaStorageOperation.java
index 4b6d795..04d6f79 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/kafka/KafkaStorageOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/kafka/KafkaStorageOperation.java
@@ -30,6 +30,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.StorageType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.datastorage.StorageFieldRequest;
import org.apache.inlong.manager.common.pojo.datastorage.StorageListResponse;
@@ -67,8 +68,8 @@ public class KafkaStorageOperation implements StorageOperation {
private StorageFieldEntityMapper storageFieldMapper;
@Override
- public Boolean accept(String storageType) {
- return BizConstant.STORAGE_KAFKA.equals(storageType);
+ public Boolean accept(StorageType storageType) {
+ return StorageType.KAFKA.equals(storageType);
}
@Override
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ClickHouseStorageServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ClickHouseStorageServiceTest.java
new file mode 100644
index 0000000..9cf2c3e
--- /dev/null
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ClickHouseStorageServiceTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.ck.ClickHouseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.ck.ClickHouseStorageResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.service.storage.StorageService;
+import org.apache.inlong.manager.web.WebBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Data storage service test
+ */
+public class ClickHouseStorageServiceTest extends WebBaseTest {
+
+ @Autowired
+ private StorageService storageService;
+ @Autowired
+ private DataStreamServiceTest streamServiceTest;
+
+ // Partial test data
+ private static final String globalGroupId = "b_group1";
+ private static final String globalStreamId = "stream1";
+ private static final String globalOperator = "test_user";
+ private static final String ckJdbcUrl = "jdbc:clickhouse://127.0.0.1:8123/default";
+ private static final String ckUsername = "ck_user";
+ private static final String ckDatabaseName = "ck_db";
+ private static final String ckTableName = "ck_tbl";
+
+ private static Integer ckStorageId;
+
+ @Before
+ public void saveStorage() {
+ streamServiceTest.saveDataStream(globalGroupId, globalStreamId, globalOperator);
+ ClickHouseStorageRequest storageInfo = new ClickHouseStorageRequest();
+ storageInfo.setInlongGroupId(globalGroupId);
+ storageInfo.setInlongStreamId(globalStreamId);
+ storageInfo.setStorageType(BizConstant.STORAGE_CLICKHOUSE);
+ storageInfo.setJdbcUrl(ckJdbcUrl);
+ storageInfo.setUsername(ckUsername);
+ storageInfo.setDatabaseName(ckDatabaseName);
+ storageInfo.setTableName(ckTableName);
+ storageInfo.setEnableCreateResource(BizConstant.DISABLE_CREATE_RESOURCE);
+ ckStorageId = storageService.save(storageInfo, globalOperator);
+ }
+
+ @After
+ public void deleteKafkaStorage() {
+ boolean result = storageService.delete(ckStorageId, BizConstant.STORAGE_CLICKHOUSE, globalOperator);
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void testListByIdentifier() {
+ StorageResponse storage = storageService.get(ckStorageId, BizConstant.STORAGE_CLICKHOUSE);
+ Assert.assertEquals(globalGroupId, storage.getInlongGroupId());
+ }
+
+ @Test
+ public void testGetAndUpdate() {
+ StorageResponse response = storageService.get(ckStorageId, BizConstant.STORAGE_CLICKHOUSE);
+ Assert.assertEquals(globalGroupId, response.getInlongGroupId());
+
+ ClickHouseStorageResponse kafkaStorageResponse = (ClickHouseStorageResponse) response;
+ kafkaStorageResponse.setEnableCreateResource(BizConstant.ENABLE_CREATE_RESOURCE);
+
+ ClickHouseStorageRequest request = CommonBeanUtils
+ .copyProperties(kafkaStorageResponse, ClickHouseStorageRequest::new);
+ boolean result = storageService.update(request, globalOperator);
+ Assert.assertTrue(result);
+ }
+
+}