You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/27 02:01:30 UTC

[inlong] branch master updated: [INLONG-6232][Manager] Support Apache Doris load node management (#6233)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c0c7adba2 [INLONG-6232][Manager] Support Apache Doris load node management (#6233)
c0c7adba2 is described below

commit c0c7adba2f0fc219fafee19bd22fb3a12d2cdba2
Author: Liao Rui <li...@users.noreply.github.com>
AuthorDate: Thu Oct 27 10:01:24 2022 +0800

    [INLONG-6232][Manager] Support Apache Doris load node management (#6233)
    
    Co-authored-by: ryanrliao <ry...@tencent.com>
---
 .../inlong/manager/common/consts/SinkType.java     |   2 +-
 .../inlong/manager/pojo/sink/doris/DorisSink.java  |  85 +++++++++++++++
 .../manager/pojo/sink/doris/DorisSinkDTO.java      | 121 +++++++++++++++++++++
 .../manager/pojo/sink/doris/DorisSinkRequest.java  |  68 ++++++++++++
 .../manager/pojo/sort/util/LoadNodeUtils.java      |  45 ++++++++
 .../service/sink/doris/DorisSinkOperator.java      |  92 ++++++++++++++++
 6 files changed, 412 insertions(+), 1 deletion(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index e1bf78245..0832092d2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -36,5 +36,5 @@ public class SinkType {
     public static final String ORACLE = "ORACLE";
     public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
     public static final String DLCICEBERG = "DLCICEBERG";
-
+    public static final String DORIS = "DORIS";
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSink.java
new file mode 100644
index 000000000..f1c777c16
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSink.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.doris;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+/**
+ * Doris sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Doris sink info")
+@JsonTypeDefine(value = SinkType.DORIS)
+public class DorisSink extends StreamSink {
+
+    @ApiModelProperty("Doris FE http address")
+    private String feNodes;
+
+    @ApiModelProperty("Username for doris accessing")
+    private String username;
+
+    @ApiModelProperty("Password for doris accessing")
+    private String password;
+
+    @ApiModelProperty("Doris table name, such as: db.tbl")
+    private String tableIdentifier;
+
+    @ApiModelProperty("Label prefix for stream loading. Used for guaranteeing Flink EOS semantics, as global unique is "
+            + "needed in 2pc.")
+    private String labelPrefix;
+
+    @ApiModelProperty("The primary key of sink table")
+    private String primaryKey;
+
+    @ApiModelProperty("The multiple enable of sink")
+    private Boolean sinkMultipleEnable = false;
+
+    @ApiModelProperty("The multiple format of sink")
+    private String sinkMultipleFormat;
+
+    @ApiModelProperty("The multiple database-pattern of sink")
+    private String databasePattern;
+
+    @ApiModelProperty("The multiple table-pattern of sink")
+    private String tablePattern;
+
+    public DorisSink() {
+        this.setSinkType(SinkType.DORIS);
+    }
+
+    @Override
+    public SinkRequest genSinkRequest() {
+        return CommonBeanUtils.copyProperties(this, DorisSinkRequest::new);
+    }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
new file mode 100644
index 000000000..60dfb1a7b
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.doris;
+
+import io.swagger.annotations.ApiModelProperty;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.AESUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+/**
+ * Sink info of Doris
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class DorisSinkDTO {
+
+    @ApiModelProperty("Doris FE http address")
+    private String feNodes;
+
+    @ApiModelProperty("Username for doris accessing")
+    private String username;
+
+    @ApiModelProperty("Password for doris accessing")
+    private String password;
+
+    @ApiModelProperty("Doris table name, such as: db.tbl")
+    private String tableIdentifier;
+
+    @ApiModelProperty("Label prefix for stream loading. Used for guaranteeing Flink EOS semantics, as global unique is "
+            + "needed in 2pc.")
+    private String labelPrefix;
+
+    @ApiModelProperty("The primary key of sink table")
+    private String primaryKey;
+
+    @ApiModelProperty("The multiple enable of sink")
+    private Boolean sinkMultipleEnable = false;
+
+    @ApiModelProperty("The multiple format of sink")
+    private String sinkMultipleFormat;
+
+    @ApiModelProperty("The multiple database-pattern of sink")
+    private String databasePattern;
+
+    @ApiModelProperty("The multiple table-pattern of sink")
+    private String tablePattern;
+
+    @ApiModelProperty("Password encrypt version")
+    private Integer encryptVersion;
+
+    @ApiModelProperty("Properties for doris")
+    private Map<String, Object> properties;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static DorisSinkDTO getFromRequest(DorisSinkRequest request) throws Exception {
+        Integer encryptVersion = AESUtils.getCurrentVersion(null);
+        String passwd = null;
+        if (StringUtils.isNotEmpty(request.getPassword())) {
+            passwd = AESUtils.encryptToString(request.getPassword().getBytes(StandardCharsets.UTF_8),
+                    encryptVersion);
+        }
+        return DorisSinkDTO.builder()
+                .feNodes(request.getFeNodes())
+                .username(request.getUsername())
+                .password(passwd)
+                .tableIdentifier(request.getTableIdentifier())
+                .sinkMultipleEnable(request.getSinkMultipleEnable())
+                .sinkMultipleFormat(request.getSinkMultipleFormat())
+                .databasePattern(request.getDatabasePattern())
+                .tablePattern(request.getTablePattern())
+                .labelPrefix(request.getLabelPrefix())
+                .encryptVersion(encryptVersion)
+                .properties(request.getProperties())
+                .build();
+    }
+
+    public static DorisSinkDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, DorisSinkDTO.class).decryptPassword();
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+        }
+    }
+
+    private DorisSinkDTO decryptPassword() throws Exception {
+        if (StringUtils.isNotEmpty(this.password)) {
+            byte[] passwordBytes = AESUtils.decryptAsString(this.password, this.encryptVersion);
+            this.password = new String(passwordBytes, StandardCharsets.UTF_8);
+        }
+        return this;
+    }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkRequest.java
new file mode 100644
index 000000000..9f7bdeecd
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkRequest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.doris;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+
+/**
+ * Doris sink request.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Doris sink request")
+@JsonTypeDefine(value = SinkType.DORIS)
+public class DorisSinkRequest extends SinkRequest {
+    @ApiModelProperty("Doris FE http address")
+    private String feNodes;
+
+    @ApiModelProperty("Username for doris accessing")
+    private String username;
+
+    @ApiModelProperty("Password for doris accessing")
+    private String password;
+
+    @ApiModelProperty("Doris table name, such as: db.tbl")
+    private String tableIdentifier;
+
+    @ApiModelProperty("Label prefix for stream loading. Used for guaranteeing Flink EOS semantics, as global unique is "
+            + "needed in 2pc.")
+    private String labelPrefix;
+
+    @ApiModelProperty("The primary key of sink table")
+    private String primaryKey;
+
+    @ApiModelProperty("The multiple enable of sink")
+    private Boolean sinkMultipleEnable = false;
+
+    @ApiModelProperty("The multiple format of sink")
+    private String sinkMultipleFormat;
+
+    @ApiModelProperty("The multiple database-pattern of sink")
+    private String databasePattern;
+
+    @ApiModelProperty("The multiple table-pattern of sink")
+    private String tablePattern;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 2937f70d0..00e6eadf6 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
 import org.apache.inlong.manager.pojo.sink.dlciceberg.DLCIcebergSink;
+import org.apache.inlong.manager.pojo.sink.doris.DorisSink;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
 import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSink;
 import org.apache.inlong.manager.pojo.sink.hbase.HBaseSink;
@@ -56,6 +57,7 @@ import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.format.RawFormat;
 import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
+import org.apache.inlong.sort.protocol.node.load.DorisLoadNode;
 import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
 import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
@@ -136,6 +138,8 @@ public class LoadNodeUtils {
                 return createLoadNode((TDSQLPostgreSQLSink) streamSink, fieldInfos, fieldRelations, properties);
             case SinkType.DLCICEBERG:
                 return createLoadNode((DLCIcebergSink) streamSink, fieldInfos, fieldRelations, properties);
+            case SinkType.DORIS:
+                return createLoadNode((DorisSink) streamSink, fieldInfos, fieldRelations, properties);
             default:
                 throw new BusinessException(String.format("Unsupported sinkType=%s to create load node", sinkType));
         }
@@ -291,6 +295,47 @@ public class LoadNodeUtils {
         );
     }
 
+    /**
+     * Create load node of Doris.
+     */
+    public static DorisLoadNode createLoadNode(DorisSink dorisSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
+        Format format = null;
+        if (dorisSink.getSinkMultipleEnable() != null && dorisSink.getSinkMultipleEnable() && StringUtils.isNotBlank(
+                dorisSink.getSinkMultipleFormat())) {
+            DataTypeEnum dataType = DataTypeEnum.forName(dorisSink.getSinkMultipleFormat());
+            switch (dataType) {
+                case CANAL:
+                    format = new CanalJsonFormat();
+                    break;
+                case DEBEZIUM_JSON:
+                    format = new DebeziumJsonFormat();
+                    break;
+                default:
+                    throw new IllegalArgumentException(String.format("Unsupported dataType=%s for doris", dataType));
+            }
+        }
+        return new DorisLoadNode(
+                dorisSink.getSinkName(),
+                dorisSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                dorisSink.getFeNodes(),
+                dorisSink.getUsername(),
+                dorisSink.getPassword(),
+                dorisSink.getTableIdentifier(),
+                null,
+                dorisSink.getSinkMultipleEnable(),
+                format,
+                dorisSink.getDatabasePattern(),
+                dorisSink.getTablePattern()
+        );
+    }
+
     /**
      * Create load node of Iceberg.
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
new file mode 100644
index 000000000..0bcb38cab
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.doris;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.doris.DorisSink;
+import org.apache.inlong.manager.pojo.sink.doris.DorisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.doris.DorisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Doris sink operator, such as save or update doris field, etc.
+ */
+@Service
+public class DorisSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DorisSinkOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.DORIS.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.DORIS;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) {
+        Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
+                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+        DorisSinkRequest sinkRequest = (DorisSinkRequest) request;
+        try {
+            DorisSinkDTO dto = DorisSinkDTO.getFromRequest(sinkRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            LOGGER.error("parsing json string to sink info failed", e);
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+        }
+    }
+
+    @Override
+    public StreamSink getFromEntity(@NotNull StreamSinkEntity entity) {
+        DorisSink sink = new DorisSink();
+        if (entity == null) {
+            return sink;
+        }
+
+        DorisSinkDTO dto = DorisSinkDTO.getFromJson(entity.getExtParams());
+        Preconditions.checkNotEmpty(dto.getFeNodes(), "doris fe nodes is empty");
+        CommonBeanUtils.copyProperties(entity, sink, true);
+        CommonBeanUtils.copyProperties(dto, sink, true);
+        List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+        sink.setSinkFieldList(sinkFields);
+        return sink;
+    }
+
+}