You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/19 03:49:12 UTC

[incubator-inlong] branch master updated: [INLONG-2562] [Manager] Support data flow to Kafka (#2578)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 335c95f  [INLONG-2562] [Manager] Support data flow to Kafka (#2578)
335c95f is described below

commit 335c95f895311963ea3714913f585d21cdad59f9
Author: chestnufang <65...@users.noreply.github.com>
AuthorDate: Sat Feb 19 11:49:07 2022 +0800

    [INLONG-2562] [Manager] Support data flow to Kafka (#2578)
---
 .../inlong/manager/common/enums/BizConstant.java   |   2 +
 .../pojo/datastorage/kafka/KafkaStorageDTO.java    |  70 +++++++
 .../kafka/KafkaStorageListResponse.java            |  43 ++++
 .../datastorage/kafka/KafkaStorageRequest.java     |  49 +++++
 .../datastorage/kafka/KafkaStorageResponse.java    |  48 +++++
 .../storage/kafka/KafkaStorageOperation.java       | 224 +++++++++++++++++++++
 .../service/core/KafkaStorageServiceTest.java      |  92 +++++++++
 7 files changed, 528 insertions(+)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
index aa76288..c68812f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
@@ -24,6 +24,8 @@ public class BizConstant {
 
     public static final String STORAGE_HIVE = "HIVE";
 
+    public static final String STORAGE_KAFKA = "KAFKA";
+
     public static final String DATA_SOURCE_DB = "DB";
 
     public static final String DATA_SOURCE_FILE = "FILE";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/kafka/KafkaStorageDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/kafka/KafkaStorageDTO.java
new file mode 100644
index 0000000..8da38f5
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/kafka/KafkaStorageDTO.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.datastorage.kafka;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+/**
+ * Kafka storage info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class KafkaStorageDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @ApiModelProperty("Kafka bootstrap servers")
+    private String address;
+
+    @ApiModelProperty("Kafka topicName")
+    private String topicName;
+
+    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
+    private String serializationType;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static KafkaStorageDTO getFromRequest(KafkaStorageRequest request) {
+        return KafkaStorageDTO.builder()
+                .address(request.getAddress())
+                .topicName(request.getTopicName())
+                .serializationType(request.getSerializationType())
+                .build();
+    }
+
+    public static KafkaStorageDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, KafkaStorageDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(BizErrorCodeEnum.STORAGE_INFO_INCORRECT.getMessage());
+        }
+    }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/kafka/KafkaStorageListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/kafka/KafkaStorageListResponse.java
new file mode 100644
index 0000000..f24af95
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/kafka/KafkaStorageListResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.datastorage.kafka;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageListResponse;
+
+/**
+ * Response of Kafka storage list
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of Kafka storage paging list")
+public class KafkaStorageListResponse extends StorageListResponse {
+
+    @ApiModelProperty("Kafka bootstrap servers")
+    private String address;
+
+    @ApiModelProperty("Kafka topicName")
+    private String topicName;
+
+    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
+    private String serializationType;
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/kafka/KafkaStorageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/kafka/KafkaStorageRequest.java
new file mode 100644
index 0000000..0d75808
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/kafka/KafkaStorageRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.datastorage.kafka;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of the Kafka storage info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the Kafka storage info")
+@JsonTypeDefine(value = BizConstant.STORAGE_KAFKA)
+public class KafkaStorageRequest extends StorageRequest {
+
+    @ApiModelProperty("Kafka bootstrap servers")
+    private String address;
+
+    @ApiModelProperty("Kafka topicName")
+    private String topicName;
+
+    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
+    private String serializationType;
+
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/kafka/KafkaStorageResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/kafka/KafkaStorageResponse.java
new file mode 100644
index 0000000..9c1053f
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/kafka/KafkaStorageResponse.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.datastorage.kafka;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageResponse;
+
+/**
+ * Response of the Kafka storage
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Response of the Kafka storage")
+public class KafkaStorageResponse extends StorageResponse {
+
+    private String storageType = BizConstant.STORAGE_KAFKA;
+
+    @ApiModelProperty("Kafka bootstrap servers")
+    private String address;
+
+    @ApiModelProperty("Kafka topicName")
+    private String topicName;
+
+    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
+    private String serializationType;
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/kafka/KafkaStorageOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/kafka/KafkaStorageOperation.java
new file mode 100644
index 0000000..4b6d795
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/storage/kafka/KafkaStorageOperation.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.storage.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Supplier;
+import javax.validation.constraints.NotNull;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageFieldRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageListResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.kafka.KafkaStorageDTO;
+import org.apache.inlong.manager.common.pojo.datastorage.kafka.KafkaStorageListResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.kafka.KafkaStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.kafka.KafkaStorageResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StorageEntity;
+import org.apache.inlong.manager.dao.entity.StorageFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StorageEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StorageFieldEntityMapper;
+import org.apache.inlong.manager.service.storage.StorageOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Kafka storage operation
+ */
+@Service
+public class KafkaStorageOperation implements StorageOperation {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStorageOperation.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired
+    private StorageEntityMapper storageMapper;
+    @Autowired
+    private StorageFieldEntityMapper storageFieldMapper;
+
+    @Override
+    public Boolean accept(String storageType) {
+        return BizConstant.STORAGE_KAFKA.equals(storageType);
+    }
+
+    @Override
+    public Integer saveOpt(StorageRequest request, String operator) {
+        String storageType = request.getStorageType();
+        Preconditions.checkTrue(BizConstant.STORAGE_KAFKA.equals(storageType),
+                BizErrorCodeEnum.STORAGE_TYPE_NOT_SUPPORT.getMessage() + ": " + storageType);
+
+        KafkaStorageRequest kafkaStorageRequest = (KafkaStorageRequest) request;
+        StorageEntity entity = CommonBeanUtils.copyProperties(kafkaStorageRequest, StorageEntity::new);
+        entity.setStatus(EntityStatus.DATA_STORAGE_NEW.getCode());
+        entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
+        entity.setCreator(operator);
+        entity.setModifier(operator);
+        Date now = new Date();
+        entity.setCreateTime(now);
+        entity.setModifyTime(now);
+
+        // get the ext params
+        KafkaStorageDTO dto = KafkaStorageDTO.getFromRequest(kafkaStorageRequest);
+        try {
+            entity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(BizErrorCodeEnum.STORAGE_SAVE_FAILED);
+        }
+        storageMapper.insert(entity);
+        Integer storageId = entity.getId();
+        request.setId(storageId);
+        this.saveFieldOpt(request);
+        return storageId;
+    }
+
+    @Override
+    public void saveFieldOpt(StorageRequest request) {
+        List<StorageFieldRequest> fieldList = request.getFieldList();
+        LOGGER.info("begin to save field={}", fieldList);
+        if (CollectionUtils.isEmpty(fieldList)) {
+            return;
+        }
+
+        int size = fieldList.size();
+        List<StorageFieldEntity> entityList = new ArrayList<>(size);
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String storageType = request.getStorageType();
+        Integer storageId = request.getId();
+        for (StorageFieldRequest fieldInfo : fieldList) {
+            StorageFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StorageFieldEntity::new);
+            if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+                fieldEntity.setFieldComment(fieldEntity.getFieldName());
+            }
+            fieldEntity.setInlongGroupId(groupId);
+            fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setStorageType(storageType);
+            fieldEntity.setStorageId(storageId);
+            fieldEntity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
+            entityList.add(fieldEntity);
+        }
+
+        storageFieldMapper.insertAll(entityList);
+        LOGGER.info("success to save field");
+    }
+
+    @Override
+    public StorageResponse getById(@NotNull String storageType, @NotNull Integer id) {
+        StorageEntity entity = storageMapper.selectByPrimaryKey(id);
+        Preconditions.checkNotNull(entity, BizErrorCodeEnum.STORAGE_INFO_NOT_FOUND.getMessage());
+        String existType = entity.getStorageType();
+        Preconditions.checkTrue(BizConstant.STORAGE_KAFKA.equals(existType),
+                String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_KAFKA, existType));
+        return this.getFromEntity(entity, KafkaStorageResponse::new);
+    }
+
+    @Override
+    public <T> T getFromEntity(StorageEntity entity, Supplier<T> target) {
+        T result = target.get();
+        if (entity == null) {
+            return result;
+        }
+        String existType = entity.getStorageType();
+        Preconditions.checkTrue(BizConstant.STORAGE_KAFKA.equals(existType),
+                String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_KAFKA, existType));
+
+        KafkaStorageDTO dto = KafkaStorageDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, result, true);
+        CommonBeanUtils.copyProperties(dto, result, true);
+
+        return result;
+    }
+
+    @Override
+    public PageInfo<? extends StorageListResponse> getPageInfo(Page<StorageEntity> entityPage) {
+        if (CollectionUtils.isEmpty(entityPage)) {
+            return new PageInfo<>();
+        }
+        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, KafkaStorageListResponse::new));
+    }
+
+    @Override
+    public void updateOpt(StorageRequest request, String operator) {
+        String storageType = request.getStorageType();
+        Preconditions.checkTrue(BizConstant.STORAGE_KAFKA.equals(storageType),
+                String.format(BizConstant.STORAGE_TYPE_NOT_SAME, BizConstant.STORAGE_KAFKA, storageType));
+
+        StorageEntity entity = storageMapper.selectByPrimaryKey(request.getId());
+        Preconditions.checkNotNull(entity, BizErrorCodeEnum.STORAGE_INFO_NOT_FOUND.getMessage());
+        KafkaStorageRequest kafkaStorageRequest = (KafkaStorageRequest) request;
+        CommonBeanUtils.copyProperties(kafkaStorageRequest, entity, true);
+        try {
+            KafkaStorageDTO dto = KafkaStorageDTO.getFromRequest(kafkaStorageRequest);
+            entity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(BizErrorCodeEnum.STORAGE_INFO_INCORRECT.getMessage());
+        }
+
+        entity.setPreviousStatus(entity.getStatus());
+        entity.setStatus(EntityStatus.BIZ_CONFIG_ING.getCode());
+        entity.setModifier(operator);
+        entity.setModifyTime(new Date());
+        storageMapper.updateByPrimaryKeySelective(entity);
+
+        boolean onlyAdd = EntityStatus.DATA_STORAGE_CONFIG_SUCCESSFUL.getCode().equals(entity.getPreviousStatus());
+        this.updateFieldOpt(onlyAdd, kafkaStorageRequest);
+
+        LOGGER.info("success to update storage of type={}", storageType);
+    }
+
+    @Override
+    public void updateFieldOpt(Boolean onlyAdd, StorageRequest request) {
+        Integer storageId = request.getId();
+        List<StorageFieldRequest> fieldRequestList = request.getFieldList();
+        if (CollectionUtils.isEmpty(fieldRequestList)) {
+            return;
+        }
+        if (onlyAdd) {
+            List<StorageFieldEntity> existsFieldList = storageFieldMapper.selectByStorageId(storageId);
+            if (existsFieldList.size() > fieldRequestList.size()) {
+                throw new BusinessException(BizErrorCodeEnum.STORAGE_FIELD_UPDATE_NOT_ALLOWED);
+            }
+            for (int i = 0; i < existsFieldList.size(); i++) {
+                if (!existsFieldList.get(i).getFieldName().equals(fieldRequestList.get(i).getFieldName())) {
+                    throw new BusinessException(BizErrorCodeEnum.STORAGE_FIELD_UPDATE_NOT_ALLOWED);
+                }
+            }
+        }
+        // First physically delete the existing fields
+        storageFieldMapper.deleteAll(storageId);
+        // Then batch save the storage fields
+        this.saveFieldOpt(request);
+        LOGGER.info("success to update field");
+    }
+
+}
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/KafkaStorageServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/KafkaStorageServiceTest.java
new file mode 100644
index 0000000..6e64727
--- /dev/null
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/KafkaStorageServiceTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.kafka.KafkaStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.kafka.KafkaStorageResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.service.storage.StorageService;
+import org.apache.inlong.manager.web.WebBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Data storage service test
+ */
+public class KafkaStorageServiceTest extends WebBaseTest {
+
+    @Autowired
+    private StorageService storageService;
+    @Autowired
+    private DataStreamServiceTest streamServiceTest;
+
+    private static final String globalGroupId = "b_group1";
+    private static final String globalStreamId = "stream1";
+    private static final String globalOperator = "test_user";
+    private static final String bootstrapServers = "127.0.0.1:9092";
+    private static final String serializationType = "Json";
+    private static final String topicName = "kafka_topic_name";
+
+
+    private static Integer kafkaStorageId;
+
+    @Before
+    public void saveStorage() {
+        streamServiceTest.saveDataStream(globalGroupId, globalStreamId, globalOperator);
+        KafkaStorageRequest storageInfo = new KafkaStorageRequest();
+        storageInfo.setInlongGroupId(globalGroupId);
+        storageInfo.setInlongStreamId(globalStreamId);
+        storageInfo.setStorageType(BizConstant.STORAGE_KAFKA);
+        storageInfo.setSerializationType(serializationType);
+        storageInfo.setAddress(bootstrapServers);
+        storageInfo.setTopicName(topicName);
+        storageInfo.setEnableCreateResource(BizConstant.DISABLE_CREATE_RESOURCE);
+        kafkaStorageId = storageService.save(storageInfo, globalOperator);
+    }
+
+    @After
+    public void deleteKafkaStorage() {
+        boolean result = storageService.delete(kafkaStorageId, BizConstant.STORAGE_KAFKA, globalOperator);
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void testListByIdentifier() {
+        StorageResponse storage = storageService.get(kafkaStorageId, BizConstant.STORAGE_KAFKA);
+        Assert.assertEquals(globalGroupId, storage.getInlongGroupId());
+    }
+
+    @Test
+    public void testGetAndUpdate() {
+        StorageResponse response = storageService.get(kafkaStorageId, BizConstant.STORAGE_KAFKA);
+        Assert.assertEquals(globalGroupId, response.getInlongGroupId());
+
+        KafkaStorageResponse kafkaStorageResponse = (KafkaStorageResponse) response;
+        kafkaStorageResponse.setEnableCreateResource(BizConstant.ENABLE_CREATE_RESOURCE);
+
+        KafkaStorageRequest request = CommonBeanUtils.copyProperties(kafkaStorageResponse, KafkaStorageRequest::new);
+        boolean result = storageService.update(request, globalOperator);
+        Assert.assertTrue(result);
+    }
+
+}