You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/27 02:01:30 UTC
[inlong] branch master updated: [INLONG-6232][Manager] Support Apache Doris load node management (#6233)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c0c7adba2 [INLONG-6232][Manager] Support Apache Doris load node management (#6233)
c0c7adba2 is described below
commit c0c7adba2f0fc219fafee19bd22fb3a12d2cdba2
Author: Liao Rui <li...@users.noreply.github.com>
AuthorDate: Thu Oct 27 10:01:24 2022 +0800
[INLONG-6232][Manager] Support Apache Doris load node management (#6233)
Co-authored-by: ryanrliao <ry...@tencent.com>
---
.../inlong/manager/common/consts/SinkType.java | 2 +-
.../inlong/manager/pojo/sink/doris/DorisSink.java | 85 +++++++++++++++
.../manager/pojo/sink/doris/DorisSinkDTO.java | 121 +++++++++++++++++++++
.../manager/pojo/sink/doris/DorisSinkRequest.java | 68 ++++++++++++
.../manager/pojo/sort/util/LoadNodeUtils.java | 45 ++++++++
.../service/sink/doris/DorisSinkOperator.java | 92 ++++++++++++++++
6 files changed, 412 insertions(+), 1 deletion(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index e1bf78245..0832092d2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -36,5 +36,5 @@ public class SinkType {
public static final String ORACLE = "ORACLE";
public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
public static final String DLCICEBERG = "DLCICEBERG";
-
+ public static final String DORIS = "DORIS";
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSink.java
new file mode 100644
index 000000000..f1c777c16
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSink.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.doris;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+/**
+ * Doris sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Doris sink info")
+@JsonTypeDefine(value = SinkType.DORIS)
+public class DorisSink extends StreamSink {
+
+ @ApiModelProperty("Doris FE http address")
+ private String feNodes;
+
+ @ApiModelProperty("Username for doris accessing")
+ private String username;
+
+ @ApiModelProperty("Password for doris accessing")
+ private String password;
+
+ @ApiModelProperty("Doris table name, such as: db.tbl")
+ private String tableIdentifier;
+
+ @ApiModelProperty("Label prefix for stream loading. Used for guaranteeing Flink EOS semantics, as global unique is "
+ + "needed in 2pc.")
+ private String labelPrefix;
+
+ @ApiModelProperty("The primary key of sink table")
+ private String primaryKey;
+
+ @ApiModelProperty("The multiple enable of sink")
+ private Boolean sinkMultipleEnable = false;
+
+ @ApiModelProperty("The multiple format of sink")
+ private String sinkMultipleFormat;
+
+ @ApiModelProperty("The multiple database-pattern of sink")
+ private String databasePattern;
+
+ @ApiModelProperty("The multiple table-pattern of sink")
+ private String tablePattern;
+
+ public DorisSink() {
+ this.setSinkType(SinkType.DORIS);
+ }
+
+ @Override
+ public SinkRequest genSinkRequest() {
+ return CommonBeanUtils.copyProperties(this, DorisSinkRequest::new);
+ }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
new file mode 100644
index 000000000..60dfb1a7b
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.doris;
+
+import io.swagger.annotations.ApiModelProperty;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.AESUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+/**
+ * Sink info of Doris
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class DorisSinkDTO {
+
+ @ApiModelProperty("Doris FE http address")
+ private String feNodes;
+
+ @ApiModelProperty("Username for doris accessing")
+ private String username;
+
+ @ApiModelProperty("Password for doris accessing")
+ private String password;
+
+ @ApiModelProperty("Doris table name, such as: db.tbl")
+ private String tableIdentifier;
+
+ @ApiModelProperty("Label prefix for stream loading. Used for guaranteeing Flink EOS semantics, as global unique is "
+ + "needed in 2pc.")
+ private String labelPrefix;
+
+ @ApiModelProperty("The primary key of sink table")
+ private String primaryKey;
+
+ @ApiModelProperty("The multiple enable of sink")
+ private Boolean sinkMultipleEnable = false;
+
+ @ApiModelProperty("The multiple format of sink")
+ private String sinkMultipleFormat;
+
+ @ApiModelProperty("The multiple database-pattern of sink")
+ private String databasePattern;
+
+ @ApiModelProperty("The multiple table-pattern of sink")
+ private String tablePattern;
+
+ @ApiModelProperty("Password encrypt version")
+ private Integer encryptVersion;
+
+ @ApiModelProperty("Properties for doris")
+ private Map<String, Object> properties;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static DorisSinkDTO getFromRequest(DorisSinkRequest request) throws Exception {
+ Integer encryptVersion = AESUtils.getCurrentVersion(null);
+ String passwd = null;
+ if (StringUtils.isNotEmpty(request.getPassword())) {
+ passwd = AESUtils.encryptToString(request.getPassword().getBytes(StandardCharsets.UTF_8),
+ encryptVersion);
+ }
+ return DorisSinkDTO.builder()
+ .feNodes(request.getFeNodes())
+ .username(request.getUsername())
+ .password(passwd)
+ .tableIdentifier(request.getTableIdentifier())
+ .sinkMultipleEnable(request.getSinkMultipleEnable())
+ .sinkMultipleFormat(request.getSinkMultipleFormat())
+ .databasePattern(request.getDatabasePattern())
+ .tablePattern(request.getTablePattern())
+ .labelPrefix(request.getLabelPrefix())
+ .encryptVersion(encryptVersion)
+ .properties(request.getProperties())
+ .build();
+ }
+
+ public static DorisSinkDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, DorisSinkDTO.class).decryptPassword();
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+ }
+ }
+
+ private DorisSinkDTO decryptPassword() throws Exception {
+ if (StringUtils.isNotEmpty(this.password)) {
+ byte[] passwordBytes = AESUtils.decryptAsString(this.password, this.encryptVersion);
+ this.password = new String(passwordBytes, StandardCharsets.UTF_8);
+ }
+ return this;
+ }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkRequest.java
new file mode 100644
index 000000000..9f7bdeecd
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkRequest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.doris;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+
+/**
+ * Doris sink request.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Doris sink request")
+@JsonTypeDefine(value = SinkType.DORIS)
+public class DorisSinkRequest extends SinkRequest {
+ @ApiModelProperty("Doris FE http address")
+ private String feNodes;
+
+ @ApiModelProperty("Username for doris accessing")
+ private String username;
+
+ @ApiModelProperty("Password for doris accessing")
+ private String password;
+
+ @ApiModelProperty("Doris table name, such as: db.tbl")
+ private String tableIdentifier;
+
+ @ApiModelProperty("Label prefix for stream loading. Used for guaranteeing Flink EOS semantics, as global unique is "
+ + "needed in 2pc.")
+ private String labelPrefix;
+
+ @ApiModelProperty("The primary key of sink table")
+ private String primaryKey;
+
+ @ApiModelProperty("The multiple enable of sink")
+ private Boolean sinkMultipleEnable = false;
+
+ @ApiModelProperty("The multiple format of sink")
+ private String sinkMultipleFormat;
+
+ @ApiModelProperty("The multiple database-pattern of sink")
+ private String databasePattern;
+
+ @ApiModelProperty("The multiple table-pattern of sink")
+ private String tablePattern;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 2937f70d0..00e6eadf6 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
import org.apache.inlong.manager.pojo.sink.dlciceberg.DLCIcebergSink;
+import org.apache.inlong.manager.pojo.sink.doris.DorisSink;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSink;
import org.apache.inlong.manager.pojo.sink.hbase.HBaseSink;
@@ -56,6 +57,7 @@ import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.format.RawFormat;
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
+import org.apache.inlong.sort.protocol.node.load.DorisLoadNode;
import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
@@ -136,6 +138,8 @@ public class LoadNodeUtils {
return createLoadNode((TDSQLPostgreSQLSink) streamSink, fieldInfos, fieldRelations, properties);
case SinkType.DLCICEBERG:
return createLoadNode((DLCIcebergSink) streamSink, fieldInfos, fieldRelations, properties);
+ case SinkType.DORIS:
+ return createLoadNode((DorisSink) streamSink, fieldInfos, fieldRelations, properties);
default:
throw new BusinessException(String.format("Unsupported sinkType=%s to create load node", sinkType));
}
@@ -291,6 +295,47 @@ public class LoadNodeUtils {
);
}
+ /**
+ * Create load node of Doris.
+ */
+ public static DorisLoadNode createLoadNode(DorisSink dorisSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
+ Format format = null;
+ if (dorisSink.getSinkMultipleEnable() != null && dorisSink.getSinkMultipleEnable() && StringUtils.isNotBlank(
+ dorisSink.getSinkMultipleFormat())) {
+ DataTypeEnum dataType = DataTypeEnum.forName(dorisSink.getSinkMultipleFormat());
+ switch (dataType) {
+ case CANAL:
+ format = new CanalJsonFormat();
+ break;
+ case DEBEZIUM_JSON:
+ format = new DebeziumJsonFormat();
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported dataType=%s for doris", dataType));
+ }
+ }
+ return new DorisLoadNode(
+ dorisSink.getSinkName(),
+ dorisSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ dorisSink.getFeNodes(),
+ dorisSink.getUsername(),
+ dorisSink.getPassword(),
+ dorisSink.getTableIdentifier(),
+ null,
+ dorisSink.getSinkMultipleEnable(),
+ format,
+ dorisSink.getDatabasePattern(),
+ dorisSink.getTablePattern()
+ );
+ }
+
/**
* Create load node of Iceberg.
*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
new file mode 100644
index 000000000..0bcb38cab
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.doris;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.doris.DorisSink;
+import org.apache.inlong.manager.pojo.sink.doris.DorisSinkDTO;
+import org.apache.inlong.manager.pojo.sink.doris.DorisSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Doris sink operator, such as save or update doris field, etc.
+ */
+@Service
+public class DorisSinkOperator extends AbstractSinkOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DorisSinkOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.DORIS.equals(sinkType);
+ }
+
+ @Override
+ protected String getSinkType() {
+ return SinkType.DORIS;
+ }
+
+ @Override
+ protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) {
+ Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType());
+ DorisSinkRequest sinkRequest = (DorisSinkRequest) request;
+ try {
+ DorisSinkDTO dto = DorisSinkDTO.getFromRequest(sinkRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ LOGGER.error("parsing json string to sink info failed", e);
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ }
+ }
+
+ @Override
+ public StreamSink getFromEntity(@NotNull StreamSinkEntity entity) {
+ DorisSink sink = new DorisSink();
+ if (entity == null) {
+ return sink;
+ }
+
+ DorisSinkDTO dto = DorisSinkDTO.getFromJson(entity.getExtParams());
+ Preconditions.checkNotEmpty(dto.getFeNodes(), "doris fe nodes is empty");
+ CommonBeanUtils.copyProperties(entity, sink, true);
+ CommonBeanUtils.copyProperties(dto, sink, true);
+ List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+ sink.setSinkFieldList(sinkFields);
+ return sink;
+ }
+
+}