You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2023/01/15 09:03:26 UTC

[GitHub] [inlong] featzhang opened a new pull request, #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

featzhang opened a new pull request, #7243:
URL: https://github.com/apache/inlong/pull/7243

   …dis sink
   
   ### Prepare a Pull Request
   
   
   - [INLONG-7242][Manager] Support register and manage the resource of Redis sink
   
   
   - Fixes #7242 
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the problem you're trying to solve?*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [x] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
     *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "fuweng11 (via GitHub)" <gi...@apache.org>.
fuweng11 commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1093108913


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.redis;
+
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.IP_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.PORT_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_SAVE_FAILED;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotBlank;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotEmpty;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotNull;
+import static org.apache.inlong.manager.common.util.Preconditions.expectTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisDataType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSchemaMapMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Redis sink operator, such as save or update redis field, etc.
+ */
+@Service
+public class RedisSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkOperator.class);
+    private static final int PORT_MAX_VALUE = 65535;
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.REDIS;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) {
+
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+        }
+
+        RedisSinkRequest sinkRequest = (RedisSinkRequest) request;
+
+        String clusterMode = sinkRequest.getClusterMode();
+        RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode);
+
+        expectNotNull(redisClusterMode,
+                "Redis ClusterMode must in one of " + Arrays.toString(RedisClusterMode.values()) + " !");
+
+        switch (redisClusterMode) {
+            case CLUSTER:
+                String clusterNodes = sinkRequest.getClusterNodes();
+                checkClusterNodes(clusterNodes);
+                break;
+            case SENTINEL:
+                String sentinelMasterName = sinkRequest.getSentinelMasterName();
+                expectNotEmpty(sentinelMasterName, "Redis MasterName of Sentinel cluster must not null!");
+                String sentinelsInfo = sinkRequest.getSentinelsInfo();
+                expectNotEmpty(sentinelsInfo, "Redis sentinelsInfo of Sentinel cluster must not null!");
+                break;
+            case STANDALONE:
+                String host = sinkRequest.getHost();
+                Integer port = sinkRequest.getPort();
+
+                expectNotEmpty(host, "Redis server host must not null!");
+                expectTrue(
+                        port != null && port > 1 && port < PORT_MAX_VALUE,
+                        "The port of the redis server must be greater than 0 and less than 65535!");
+                break;
+        }
+        RedisDataType dataType = RedisDataType.valueOf(sinkRequest.getDataType());
+        expectNotNull(dataType, "Redis DataType must not null");
+
+        RedisSchemaMapMode mapMode = RedisSchemaMapMode.valueOf(sinkRequest.getSchemaMapMode());
+        expectTrue(dataType.getMapModes().contains(mapMode),
+                "Redis schemaMapMode '" + mapMode + "' is not supported in '" + dataType + "'");
+
+        try {
+            RedisSinkDTO dto = RedisSinkDTO.getFromRequest(sinkRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Redis SinkDTO failure: %s", e.getMessage()));
+        }
+    }
+
+    private void checkClusterNodes(String clusterNodes) {
+
+        expectTrue(clusterNodes != null && !clusterNodes.isEmpty(), "the nodes of Redis cluster must not null");

Review Comment:
   `expectNotBlank`



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.redis;
+
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.IP_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.PORT_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_SAVE_FAILED;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotBlank;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotEmpty;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotNull;
+import static org.apache.inlong.manager.common.util.Preconditions.expectTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisDataType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSchemaMapMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Redis sink operator, such as save or update redis field, etc.
+ */
+@Service
+public class RedisSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkOperator.class);
+    private static final int PORT_MAX_VALUE = 65535;
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.REDIS;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) {
+
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+        }
+
+        RedisSinkRequest sinkRequest = (RedisSinkRequest) request;
+
+        String clusterMode = sinkRequest.getClusterMode();
+        RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode);
+
+        expectNotNull(redisClusterMode,
+                "Redis ClusterMode must in one of " + Arrays.toString(RedisClusterMode.values()) + " !");
+
+        switch (redisClusterMode) {
+            case CLUSTER:
+                String clusterNodes = sinkRequest.getClusterNodes();
+                checkClusterNodes(clusterNodes);
+                break;
+            case SENTINEL:
+                String sentinelMasterName = sinkRequest.getSentinelMasterName();
+                expectNotEmpty(sentinelMasterName, "Redis MasterName of Sentinel cluster must not null!");
+                String sentinelsInfo = sinkRequest.getSentinelsInfo();
+                expectNotEmpty(sentinelsInfo, "Redis sentinelsInfo of Sentinel cluster must not null!");
+                break;
+            case STANDALONE:
+                String host = sinkRequest.getHost();
+                Integer port = sinkRequest.getPort();
+
+                expectNotEmpty(host, "Redis server host must not null!");

Review Comment:
   `expectNotBlank`



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.redis;
+
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.IP_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.PORT_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_SAVE_FAILED;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotBlank;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotEmpty;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotNull;
+import static org.apache.inlong.manager.common.util.Preconditions.expectTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisDataType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSchemaMapMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Redis sink operator, such as save or update redis field, etc.
+ */
+@Service
+public class RedisSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkOperator.class);
+    private static final int PORT_MAX_VALUE = 65535;
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.REDIS;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) {
+
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+        }
+
+        RedisSinkRequest sinkRequest = (RedisSinkRequest) request;
+
+        String clusterMode = sinkRequest.getClusterMode();
+        RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode);
+
+        expectNotNull(redisClusterMode,
+                "Redis ClusterMode must in one of " + Arrays.toString(RedisClusterMode.values()) + " !");
+
+        switch (redisClusterMode) {
+            case CLUSTER:
+                String clusterNodes = sinkRequest.getClusterNodes();
+                checkClusterNodes(clusterNodes);
+                break;
+            case SENTINEL:
+                String sentinelMasterName = sinkRequest.getSentinelMasterName();
+                expectNotEmpty(sentinelMasterName, "Redis MasterName of Sentinel cluster must not null!");
+                String sentinelsInfo = sinkRequest.getSentinelsInfo();
+                expectNotEmpty(sentinelsInfo, "Redis sentinelsInfo of Sentinel cluster must not null!");

Review Comment:
   `expectNotBlank`



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.redis;
+
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.IP_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.PORT_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_SAVE_FAILED;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotBlank;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotEmpty;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotNull;
+import static org.apache.inlong.manager.common.util.Preconditions.expectTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisDataType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSchemaMapMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Redis sink operator, such as save or update redis field, etc.
+ */
+@Service
+public class RedisSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkOperator.class);
+    private static final int PORT_MAX_VALUE = 65535;
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.REDIS;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) {
+
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+        }
+
+        RedisSinkRequest sinkRequest = (RedisSinkRequest) request;
+
+        String clusterMode = sinkRequest.getClusterMode();
+        RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode);
+
+        expectNotNull(redisClusterMode,
+                "Redis ClusterMode must in one of " + Arrays.toString(RedisClusterMode.values()) + " !");
+
+        switch (redisClusterMode) {
+            case CLUSTER:
+                String clusterNodes = sinkRequest.getClusterNodes();
+                checkClusterNodes(clusterNodes);
+                break;
+            case SENTINEL:
+                String sentinelMasterName = sinkRequest.getSentinelMasterName();
+                expectNotEmpty(sentinelMasterName, "Redis MasterName of Sentinel cluster must not null!");

Review Comment:
   `expectNotBlank`



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.redis;
+
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.IP_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.PORT_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_SAVE_FAILED;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotBlank;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotEmpty;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotNull;
+import static org.apache.inlong.manager.common.util.Preconditions.expectTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisDataType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSchemaMapMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Redis sink operator, such as save or update redis field, etc.
+ */
+@Service
+public class RedisSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkOperator.class);
+    private static final int PORT_MAX_VALUE = 65535;
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.REDIS;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) {
+
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+        }
+
+        RedisSinkRequest sinkRequest = (RedisSinkRequest) request;
+
+        String clusterMode = sinkRequest.getClusterMode();
+        RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode);
+
+        expectNotNull(redisClusterMode,
+                "Redis ClusterMode must in one of " + Arrays.toString(RedisClusterMode.values()) + " !");
+
+        switch (redisClusterMode) {
+            case CLUSTER:
+                String clusterNodes = sinkRequest.getClusterNodes();
+                checkClusterNodes(clusterNodes);
+                break;
+            case SENTINEL:
+                String sentinelMasterName = sinkRequest.getSentinelMasterName();
+                expectNotEmpty(sentinelMasterName, "Redis MasterName of Sentinel cluster must not null!");
+                String sentinelsInfo = sinkRequest.getSentinelsInfo();
+                expectNotEmpty(sentinelsInfo, "Redis sentinelsInfo of Sentinel cluster must not null!");
+                break;
+            case STANDALONE:
+                String host = sinkRequest.getHost();
+                Integer port = sinkRequest.getPort();
+
+                expectNotEmpty(host, "Redis server host must not null!");
+                expectTrue(
+                        port != null && port > 1 && port < PORT_MAX_VALUE,
+                        "The port of the redis server must be greater than 0 and less than 65535!");
+                break;
+        }
+        RedisDataType dataType = RedisDataType.valueOf(sinkRequest.getDataType());
+        expectNotNull(dataType, "Redis DataType must not null");
+
+        RedisSchemaMapMode mapMode = RedisSchemaMapMode.valueOf(sinkRequest.getSchemaMapMode());
+        expectTrue(dataType.getMapModes().contains(mapMode),
+                "Redis schemaMapMode '" + mapMode + "' is not supported in '" + dataType + "'");
+
+        try {
+            RedisSinkDTO dto = RedisSinkDTO.getFromRequest(sinkRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Redis SinkDTO failure: %s", e.getMessage()));
+        }
+    }
+
+    private void checkClusterNodes(String clusterNodes) {
+
+        expectTrue(clusterNodes != null && !clusterNodes.isEmpty(), "the nodes of Redis cluster must not null");
+        String[] nodeArray = clusterNodes.split(",");
+        expectNotEmpty(nodeArray, "the nodes of Redis cluster must not null");
+
+        for (String node : nodeArray) {
+            expectTrue(node != null && !node.isEmpty(), "Redis server host must not null!");

Review Comment:
   `expectNotBlank`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by GitBox <gi...@apache.org>.
featzhang commented on PR #7243:
URL: https://github.com/apache/inlong/pull/7243#issuecomment-1385643721

   This pr depends on the modification of the sort part, so the merge pipeline did not run passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "fuweng11 (via GitHub)" <gi...@apache.org>.
fuweng11 commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1092836785


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.redis;
+
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.IP_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.PORT_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_SAVE_FAILED;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotBlank;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotEmpty;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotNull;
+import static org.apache.inlong.manager.common.util.Preconditions.expectTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisDataType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSchemaMapMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Redis sink operator, such as save or update redis field, etc.
+ */
+@Service
+public class RedisSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkOperator.class);
+    private static final int PORT_MAX_VALUE = 65535;
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.REDIS;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) {
+
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+        }
+
+        RedisSinkRequest sinkRequest = (RedisSinkRequest) request;
+
+        String clusterMode = sinkRequest.getClusterMode();
+        RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode);
+
+        expectNotNull(redisClusterMode,
+                "Redis ClusterMode must in one of " + Arrays.toString(RedisClusterMode.values()) + " !");
+
+        switch (redisClusterMode) {
+            case CLUSTER:
+                String clusterNodes = sinkRequest.getClusterNodes();
+                checkClusterNodes(clusterNodes);
+                break;
+            case SENTINEL:
+                String sentinelMasterName = sinkRequest.getSentinelMasterName();
+                expectNotEmpty(sentinelMasterName, "Redis MasterName of Sentinel cluster must not null!");
+                String sentinelsInfo = sinkRequest.getSentinelsInfo();
+                expectNotEmpty(sentinelsInfo, "Redis sentinelsInfo of Sentinel cluster must not null!");
+                break;
+            case STANDALONE:
+                String host = sinkRequest.getHost();
+                Integer port = sinkRequest.getPort();
+
+                expectNotEmpty(host, "Redis server host must not null!");
+                expectTrue(

Review Comment:
   `expectTrue(port != null && port > 1 && port < PORT_MAX_VALUE,
                           "The port of the redis server must be greater than 0 and less than 65535!")`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "fuweng11 (via GitHub)" <gi...@apache.org>.
fuweng11 commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1092770595


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisColumnInfo.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.redis;
+
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+
+/**
+ * Redis column info.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonTypeDefine(value = SinkType.REDIS)
+public class RedisColumnInfo extends SinkField {
+
+    private Boolean isSortKey = false;

Review Comment:
   If you want to save these parameters, you need to override the method `AbstractSinkOperator.getSinkFields`、`AbstractSinkOperator.saveFieldOpt`.Because inlong-dashoboard does not put these argument into `request.ext_param`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1091421735


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -651,4 +715,59 @@ public static void checkPartitionField(List<SinkField> fieldList, List<HiveParti
         }
     }
 
+    /**
+     * Parse format
+     *
+     * @param formatName        data serialization, support: csv, json, canal, avro, etc
+     * @param wrapWithInlongMsg whether wrap content with {@link InLongMsgFormat}
+     * @param separatorStr      the separator of data content
+     * @param ignoreParseErrors whether ignore deserialization error data
+     * @return the format for serialized content
+     */
+    private static Format parsingFormat(

Review Comment:
   This is a general implementation, and other newly added storages can also use this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by GitBox <gi...@apache.org>.
fuweng11 commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1070789801


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * Hudi data node info

Review Comment:
   Redis data node info.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * Hudi data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HUDI)
+@ApiModel("Hudi data node info")
+public class RedisDataNodeInfo extends DataNodeInfo {
+
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    private String catalogType = "HIVE";
+
+    @ApiModelProperty("Hudi data warehouse dir")

Review Comment:
   Hudi?



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeRequest.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * Hudi data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HUDI)
+@ApiModel("Hudi data node request")

Review Comment:
   Redis data node request



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * Hudi data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HUDI)
+@ApiModel("Hudi data node info")

Review Comment:
   Redis data node info.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * Hudi data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HUDI)

Review Comment:
   DataNodeType.REDIS



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeRequest.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * Hudi data node request

Review Comment:
   Redis data node request



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeRequest.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * Hudi data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HUDI)
+@ApiModel("Hudi data node request")
+public class RedisDataNodeRequest extends DataNodeRequest {
+
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    private String catalogType = "HIVE";
+
+    @ApiModelProperty("Hudi data warehouse dir")

Review Comment:
   Hudi?



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/redis/RedisDataNodeInfo.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * Hudi data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HUDI)
+@ApiModel("Hudi data node info")
+public class RedisDataNodeInfo extends DataNodeInfo {
+
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    private String catalogType = "HIVE";
+
+    @ApiModelProperty("Hudi data warehouse dir")
+    private String warehouse;
+
+    public RedisDataNodeInfo() {
+        this.setType(DataNodeType.HUDI);

Review Comment:
   DataNodeType.REDIS



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1091435748


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.redis;
+
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_SAVE_FAILED;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT;
+import static org.apache.inlong.manager.common.util.BusinessPreconditions.verify;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.BusinessPreconditions.Verification;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisColumnInfo;
+import org.apache.inlong.manager.pojo.sink.redis.RedisDataType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSchemaMapMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Redis sink operator, such as save or update redis field, etc.
+ */
+@Service
+public class RedisSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkOperator.class);
+    private static final int PORT_MAX_VALUE = 65535;
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.REDIS;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) {
+
+        verify(SINK_TYPE_NOT_SUPPORT)
+                .checkTrue(
+                        this.getSinkType().equals(request.getSinkType()),
+                        SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+
+        RedisSinkRequest sinkRequest = (RedisSinkRequest) request;
+
+        String clusterMode = sinkRequest.getClusterMode();
+        RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode);
+        Verification saveFailedVerify = verify(SINK_SAVE_FAILED);
+        saveFailedVerify
+                .checkNotNull(
+                        redisClusterMode,
+                        "Redis ClusterMode must in [" + Arrays.toString(RedisClusterMode.values()) + "] !");
+
+        switch (redisClusterMode) {
+            case CLUSTER:
+                String clusterNodes = sinkRequest.getClusterNodes();
+                checkClusterNodes(clusterNodes);
+                break;
+            case SENTINEL:
+                String sentinelMasterName = sinkRequest.getSentinelMasterName();
+                saveFailedVerify.checkNotEmpty(sentinelMasterName,
+                        "Redis MasterName of Sentinel cluster must not null!");
+                String sentinelsInfo = sinkRequest.getSentinelsInfo();
+                saveFailedVerify.checkNotEmpty(sentinelsInfo, "Redis sentinelsInfo of Sentinel cluster must not null!");
+                break;
+            case STANDALONE:
+                String host = sinkRequest.getHost();
+                Integer port = sinkRequest.getPort();
+                saveFailedVerify.checkNotEmpty(host, "Redis server host must not null!");
+                saveFailedVerify.checkTrue(
+                        port == null || port < 1 || port > PORT_MAX_VALUE,
+                        "The port of the redis server must be greater than 0 and less than 65535!");
+                break;
+        }
+        RedisDataType dataType = RedisDataType.valueOf(sinkRequest.getDataType());
+        saveFailedVerify.checkNotNull(dataType, "Redis DataType must not null");
+        RedisSchemaMapMode mapMode = RedisSchemaMapMode.valueOf(sinkRequest.getSchemaMapMode());
+        saveFailedVerify.checkTrue(
+                dataType.getMapModes().contains(mapMode),
+                "Redis schemaMapMode '" + mapMode + "' is not supported in '" + dataType + "'");
+
+        try {
+            RedisSinkDTO dto = RedisSinkDTO.getFromRequest(sinkRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Redis SinkDTO failure: %s", e.getMessage()));
+        }
+    }
+
+    private void checkClusterNodes(String clusterNodes) {
+
+        Verification verify = verify(SINK_SAVE_FAILED);
+        verify.checkNotBlank(clusterNodes, "the nodes of Redis cluster must not null");
+        String[] nodeArray = clusterNodes.split(",");
+        verify.checkNotEmpty(nodeArray, "the nodes of Redis cluster must not null");
+
+        for (String node : nodeArray) {
+            verify.checkNotBlank(node, "Redis server host must not null!");
+            String[] ipPort = node.split(":");
+            verify.checkTrue(ipPort.length == 2, "The ip and port of Redis server must be in form: ip:port");
+            verify.checkNotBlank(ipPort[0], "The ip can not be null");
+            verify.checkNotBlank(ipPort[1], "The port can not be null");
+        }
+    }
+
+    @Override
+    public StreamSink getFromEntity(StreamSinkEntity entity) {
+        RedisSink sink = new RedisSink();
+        if (entity == null) {
+            return sink;
+        }
+
+        RedisSinkDTO dto = RedisSinkDTO.getFromJson(entity.getExtParams());
+
+        CommonBeanUtils.copyProperties(entity, sink, true);
+        CommonBeanUtils.copyProperties(dto, sink, true);
+        List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+        sink.setSinkFieldList(sinkFields);
+        return sink;
+    }
+
+    @Override
+    protected void checkFieldInfo(SinkField field) {

Review Comment:
    removed this function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1092777324


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisColumnInfo.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.redis;
+
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+
+/**
+ * Redis column info.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonTypeDefine(value = SinkType.REDIS)
+public class RedisColumnInfo extends SinkField {
+
+    private Boolean isSortKey = false;

Review Comment:
   @fuweng11  Sorry! This class is not being used, I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "dockerzhang (via GitHub)" <gi...@apache.org>.
dockerzhang merged PR #7243:
URL: https://github.com/apache/inlong/pull/7243


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gosonzhang commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "gosonzhang (via GitHub)" <gi...@apache.org>.
gosonzhang commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1090105811


##########
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/BusinessPreconditions.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+/**
+ * Parameter verification tools
+ */
+public class BusinessPreconditions {

Review Comment:
   From the perspective of implementation, the whole content is consistent with the practice of Preconditions. It is recommended to remove this class, if the Preconditions do not meet the requirements, you can adjust the Preconditions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "fuweng11 (via GitHub)" <gi...@apache.org>.
fuweng11 commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1092785514


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisSink.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.redis;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Redis sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Redis sink info")
+@JsonTypeDefine(value = SinkType.REDIS)
+public class RedisSink extends StreamSink {
+
+    @ApiModelProperty("Redis cluster mode")
+    private String clusterMode;
+
+    @ApiModelProperty("Redis database id")
+    private Integer database;
+
+    @ApiModelProperty("Redis data type")
+    private String dataType;
+
+    @ApiModelProperty("Redis schema mapping mode")
+    private String schemaMapMode;
+
+    @ApiModelProperty("Password for Redis accessing")
+    private String password;
+
+    @ApiModelProperty("Database name")
+    private String databaseName;
+
+    @ApiModelProperty("Expire time of Redis row")
+    private Integer ttl;
+
+    @ApiModelProperty("The timeout of Redis client")
+    private Integer timeout;
+
+    @ApiModelProperty("The socket timeout of redis client")
+    private Integer soTimeout;
+
+    @ApiModelProperty("The max total of sink client")
+    private Integer maxTotal;
+
+    @ApiModelProperty("The max idle of sink client")
+    private Integer maxIdle;
+
+    @ApiModelProperty("The min idle of sink client")
+    private Integer minIdle;
+
+    @ApiModelProperty("The max retry time")
+    private Integer maxRetries;
+
+    @ApiModelProperty("The host of Redis server")
+    private String host;
+
+    @ApiModelProperty("The port of Redis server")
+    private Integer port;
+
+    @ApiModelProperty("The master name of Redis sentinel cluster")
+    private String sentinelMasterName;
+
+    @ApiModelProperty("The sentinels info of Redis sentinel cluster")
+    private String sentinelsInfo;
+
+    /**
+     * The address of redis server, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
+     * If server is not cluster mode, server address format eg: 127.0.0.1:8080 .
+     */
+    @ApiModelProperty("The cluster nodes of Redis cluster")
+    private String clusterNodes;
+
+    @ApiModelProperty("The DataEncoding of Redis STATIC_PREFIX_MATCH data-type")
+    private String formatDataEncoding;
+
+    @ApiModelProperty("The DataType of Redis STATIC_PREFIX_MATCH data-type")
+    private String formatDataType;
+
+    @ApiModelProperty("Whether ignore parse error of Redis STATIC_PREFIX_MATCH data-type")
+    private Boolean formatIgnoreParseError;
+
+    @ApiModelProperty("The data separator of Redis STATIC_PREFIX_MATCH data-type")
+    private String formatDataSeparator;
+    @ApiModelProperty("Extended properties")
+    private List<HashMap<String, String>> extList;

Review Comment:
   Why not use `StreamSink.properties`? Why is `extList` used here, but `properties` used in RedisSinkDTO?



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -651,4 +715,59 @@ public static void checkPartitionField(List<SinkField> fieldList, List<HiveParti
         }
     }
 
+    /**
+     * Parse format
+     *
+     * @param formatName        data serialization, support: csv, json, canal, avro, etc
+     * @param wrapWithInlongMsg whether wrap content with {@link InLongMsgFormat}
+     * @param separatorStr      the separator of data content
+     * @param ignoreParseErrors whether ignore deserialization error data
+     * @return the format for serialized content
+     */
+    private static Format parsingFormat(

Review Comment:
   But it seems that each type of sink supports a different datatype.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "fuweng11 (via GitHub)" <gi...@apache.org>.
fuweng11 commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1090107422


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -651,4 +715,59 @@ public static void checkPartitionField(List<SinkField> fieldList, List<HiveParti
         }
     }
 
+    /**
+     * Parse format
+     *
+     * @param formatName        data serialization, support: csv, json, canal, avro, etc
+     * @param wrapWithInlongMsg whether wrap content with {@link InLongMsgFormat}
+     * @param separatorStr      the separator of data content
+     * @param ignoreParseErrors whether ignore deserialization error data
+     * @return the format for serialized content
+     */
+    private static Format parsingFormat(

Review Comment:
   Just for redis to use? If so, I suggest changing the name.



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/redis/RedisResourceOperator.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.redis;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+/**
+ * Redis resource operator
+ */
+@Service
+public class RedisResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisResourceOperator.class);
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    /**
+     * Create Redis table according to the sink config
+     */

Review Comment:
   @Override



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/redis/RedisResourceOperator.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.redis;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+/**
+ * Redis resource operator
+ */
+@Service
+public class RedisResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisResourceOperator.class);
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    /**
+     * Create Redis table according to the sink config
+     */
+    public void createSinkResource(SinkInfo sinkInfo) {

Review Comment:
   Do not create a table? If so, it is recommended to add log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1091423317


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/redis/RedisResourceOperator.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.redis;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+/**
+ * Redis resource operator
+ */
+@Service
+public class RedisResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisResourceOperator.class);
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    /**
+     * Create Redis table according to the sink config
+     */

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gosonzhang commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "gosonzhang (via GitHub)" <gi...@apache.org>.
gosonzhang commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1093094269


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.redis;
+
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.IP_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.PORT_EMPTY;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_SAVE_FAILED;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotBlank;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotEmpty;
+import static org.apache.inlong.manager.common.util.Preconditions.expectNotNull;
+import static org.apache.inlong.manager.common.util.Preconditions.expectTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisDataType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSchemaMapMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Redis sink operator, such as save or update redis field, etc.
+ */
+@Service
+public class RedisSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkOperator.class);
+    private static final int PORT_MAX_VALUE = 65535;
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.REDIS;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) {
+
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+        }
+
+        RedisSinkRequest sinkRequest = (RedisSinkRequest) request;
+
+        String clusterMode = sinkRequest.getClusterMode();
+        RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode);
+
+        expectNotNull(redisClusterMode,

Review Comment:
   Should the not-null check be placed inside the cast, or add a default cast value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "fuweng11 (via GitHub)" <gi...@apache.org>.
fuweng11 commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1092770595


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/redis/RedisColumnInfo.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.redis;
+
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+
+/**
+ * Redis column info.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonTypeDefine(value = SinkType.REDIS)
+public class RedisColumnInfo extends SinkField {
+
+    private Boolean isSortKey = false;

Review Comment:
   If you want to save these parameters, you need to override the method `RedisSinkOperator.getSinkFields`、`RedisSinkOperator.saveFieldOpt`.Because inlong-dashoboard does not put these argument into `request.ext_param`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] featzhang commented on pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "featzhang (via GitHub)" <gi...@apache.org>.
featzhang commented on PR #7243:
URL: https://github.com/apache/inlong/pull/7243#issuecomment-1412005189

   This pr depends on the modification of the Sort, and the pipeline fails.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #7243: [INLONG-7242][Manager] Support register and manage the resource of Redis sink

Posted by "fuweng11 (via GitHub)" <gi...@apache.org>.
fuweng11 commented on code in PR #7243:
URL: https://github.com/apache/inlong/pull/7243#discussion_r1090120694


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/redis/RedisSinkOperator.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.redis;
+
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_SAVE_FAILED;
+import static org.apache.inlong.manager.common.enums.ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT;
+import static org.apache.inlong.manager.common.util.BusinessPreconditions.verify;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.BusinessPreconditions.Verification;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisClusterMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisColumnInfo;
+import org.apache.inlong.manager.pojo.sink.redis.RedisDataType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSchemaMapMode;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Redis sink operator, such as save or update redis field, etc.
+ */
+@Service
+public class RedisSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSinkOperator.class);
+    private static final int PORT_MAX_VALUE = 65535;
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.REDIS.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.REDIS;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) {
+
+        verify(SINK_TYPE_NOT_SUPPORT)
+                .checkTrue(
+                        this.getSinkType().equals(request.getSinkType()),
+                        SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+
+        RedisSinkRequest sinkRequest = (RedisSinkRequest) request;
+
+        String clusterMode = sinkRequest.getClusterMode();
+        RedisClusterMode redisClusterMode = RedisClusterMode.of(clusterMode);
+        Verification saveFailedVerify = verify(SINK_SAVE_FAILED);
+        saveFailedVerify
+                .checkNotNull(
+                        redisClusterMode,
+                        "Redis ClusterMode must in [" + Arrays.toString(RedisClusterMode.values()) + "] !");
+
+        switch (redisClusterMode) {
+            case CLUSTER:
+                String clusterNodes = sinkRequest.getClusterNodes();
+                checkClusterNodes(clusterNodes);
+                break;
+            case SENTINEL:
+                String sentinelMasterName = sinkRequest.getSentinelMasterName();
+                saveFailedVerify.checkNotEmpty(sentinelMasterName,
+                        "Redis MasterName of Sentinel cluster must not null!");
+                String sentinelsInfo = sinkRequest.getSentinelsInfo();
+                saveFailedVerify.checkNotEmpty(sentinelsInfo, "Redis sentinelsInfo of Sentinel cluster must not null!");
+                break;
+            case STANDALONE:
+                String host = sinkRequest.getHost();
+                Integer port = sinkRequest.getPort();
+                saveFailedVerify.checkNotEmpty(host, "Redis server host must not null!");
+                saveFailedVerify.checkTrue(
+                        port == null || port < 1 || port > PORT_MAX_VALUE,
+                        "The port of the redis server must be greater than 0 and less than 65535!");
+                break;
+        }
+        RedisDataType dataType = RedisDataType.valueOf(sinkRequest.getDataType());
+        saveFailedVerify.checkNotNull(dataType, "Redis DataType must not null");
+        RedisSchemaMapMode mapMode = RedisSchemaMapMode.valueOf(sinkRequest.getSchemaMapMode());
+        saveFailedVerify.checkTrue(
+                dataType.getMapModes().contains(mapMode),
+                "Redis schemaMapMode '" + mapMode + "' is not supported in '" + dataType + "'");
+
+        try {
+            RedisSinkDTO dto = RedisSinkDTO.getFromRequest(sinkRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Redis SinkDTO failure: %s", e.getMessage()));
+        }
+    }
+
+    private void checkClusterNodes(String clusterNodes) {
+
+        Verification verify = verify(SINK_SAVE_FAILED);
+        verify.checkNotBlank(clusterNodes, "the nodes of Redis cluster must not null");
+        String[] nodeArray = clusterNodes.split(",");
+        verify.checkNotEmpty(nodeArray, "the nodes of Redis cluster must not null");
+
+        for (String node : nodeArray) {
+            verify.checkNotBlank(node, "Redis server host must not null!");
+            String[] ipPort = node.split(":");
+            verify.checkTrue(ipPort.length == 2, "The ip and port of Redis server must be in form: ip:port");
+            verify.checkNotBlank(ipPort[0], "The ip can not be null");
+            verify.checkNotBlank(ipPort[1], "The port can not be null");
+        }
+    }
+
+    @Override
+    public StreamSink getFromEntity(StreamSinkEntity entity) {
+        RedisSink sink = new RedisSink();
+        if (entity == null) {
+            return sink;
+        }
+
+        RedisSinkDTO dto = RedisSinkDTO.getFromJson(entity.getExtParams());
+
+        CommonBeanUtils.copyProperties(entity, sink, true);
+        CommonBeanUtils.copyProperties(dto, sink, true);
+        List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+        sink.setSinkFieldList(sinkFields);
+        return sink;
+    }
+
+    @Override
+    protected void checkFieldInfo(SinkField field) {

Review Comment:
   The method name does not match the actual function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org