You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/29 11:14:10 UTC

[inlong] branch master updated: [INLONG-6665][Manager] Support to save additional info for the Elasticsearch field (#6666)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 470269eb3 [INLONG-6665][Manager] Support to save additional info for the Elasticsearch field (#6666)
470269eb3 is described below

commit 470269eb36dade56c8cff5453cffd1d3c16e64b5
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Tue Nov 29 19:14:04 2022 +0800

    [INLONG-6665][Manager] Support to save additional info for the Elasticsearch field (#6666)
---
 .../pojo/sink/es/ElasticsearchFieldInfo.java       | 25 +++----
 .../service/resource/sink/es/ElasticsearchApi.java | 16 ++---
 .../sink/es/ElasticsearchResourceOperator.java     | 27 ++++----
 .../manager/service/sink/AbstractSinkOperator.java |  3 +-
 .../manager/service/sink/StreamSinkOperator.java   |  7 ++
 .../service/sink/es/ElasticsearchSinkOperator.java | 78 ++++++++++++++++++++--
 6 files changed, 117 insertions(+), 39 deletions(-)

diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchFieldInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchFieldInfo.java
index 82a280a41..9c952550c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchFieldInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchFieldInfo.java
@@ -19,30 +19,24 @@ package org.apache.inlong.manager.pojo.sink.es;
 
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.SinkField;
 
 import javax.validation.constraints.NotNull;
 
 @Data
-@Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class ElasticsearchFieldInfo {
-
-    @ApiModelProperty("Elasticsearch Field name")
-    private String name;
-
-    @ApiModelProperty("Elasticsearch Field type")
-    private String type;
-
-    @ApiModelProperty("Elasticsearch Field text format")
-    private String format;
+@JsonTypeDefine(value = SinkType.ELASTICSEARCH)
+public class ElasticsearchFieldInfo extends SinkField {
 
     @ApiModelProperty("Elasticsearch Analyzer")
     private String analyzer;
@@ -53,6 +47,13 @@ public class ElasticsearchFieldInfo {
     @ApiModelProperty("Elasticsearch Scaling Factor")
     private String scalingFactor;
 
+    /**
+     * Get the dto instance from the request
+     */
+    public static ElasticsearchFieldInfo getFromRequest(SinkField sinkField) {
+        return CommonBeanUtils.copyProperties(sinkField, ElasticsearchFieldInfo::new, true);
+    }
+
     /**
      * Get the extra param from the Json
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
index 6a5ebafe1..41309e6de 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
@@ -116,10 +116,10 @@ public class ElasticsearchApi {
     private List<String> getMappingInfo(List<ElasticsearchFieldInfo> fieldsInfo) {
         List<String> fieldList = new ArrayList<>();
         for (ElasticsearchFieldInfo field : fieldsInfo) {
-            StringBuilder fieldStr = new StringBuilder().append("        \"").append(field.getName())
+            StringBuilder fieldStr = new StringBuilder().append("        \"").append(field.getFieldName())
                     .append("\" : {\n          \"type\" : \"")
-                    .append(field.getType()).append("\"");
-            if (field.getType().equals("text")) {
+                    .append(field.getFieldType()).append("\"");
+            if (field.getFieldType().equals("text")) {
                 if (StringUtils.isNotEmpty(field.getAnalyzer())) {
                     fieldStr.append(",\n          \"analyzer\" : \"")
                             .append(field.getAnalyzer()).append("\"");
@@ -128,12 +128,12 @@ public class ElasticsearchApi {
                     fieldStr.append(",\n          \"search_analyzer\" : \"")
                             .append(field.getSearchAnalyzer()).append("\"");
                 }
-            } else if (field.getType().equals("date")) {
-                if (StringUtils.isNotEmpty(field.getFormat())) {
+            } else if (field.getFieldType().equals("date")) {
+                if (StringUtils.isNotEmpty(field.getFieldFormat())) {
                     fieldStr.append(",\n          \"format\" : \"")
-                            .append(field.getFormat()).append("\"");
+                            .append(field.getFieldFormat()).append("\"");
                 }
-            } else if (field.getType().equals("scaled_float")) {
+            } else if (field.getFieldType().equals("scaled_float")) {
                 if (StringUtils.isNotEmpty(field.getScalingFactor())) {
                     fieldStr.append(",\n          \"scaling_factor\" : \"")
                             .append(field.getScalingFactor()).append("\"");
@@ -213,7 +213,7 @@ public class ElasticsearchApi {
         Map<String, Object> filedMap = (Map<String, Object>) mapping.get(indexName).getSourceAsMap().get(FIELD_KEY);
         for (String key : filedMap.keySet()) {
             for (ElasticsearchFieldInfo field : notExistFieldInfos) {
-                if (field.getName().equals(key)) {
+                if (field.getFieldName().equals(key)) {
                     notExistFieldInfos.remove(field);
                     break;
                 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
index 7d318eaee..c66af6951 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
@@ -20,14 +20,15 @@ package org.apache.inlong.manager.service.resource.sink.es;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
-import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
-import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.slf4j.Logger;
@@ -124,16 +125,16 @@ public class ElasticsearchResourceOperator implements SinkResourceOperator {
     public List<ElasticsearchFieldInfo> getElasticsearchFieldFromSink(List<StreamSinkFieldEntity> sinkList) {
         List<ElasticsearchFieldInfo> esFieldList = new ArrayList<>();
         for (StreamSinkFieldEntity fieldEntity : sinkList) {
-            ElasticsearchFieldInfo esFieldInfo = new ElasticsearchFieldInfo();
-            esFieldInfo.setName(fieldEntity.getFieldName());
-            esFieldInfo.setType(fieldEntity.getFieldType());
-            esFieldInfo.setFormat(fieldEntity.getFieldFormat());
-            ElasticsearchFieldInfo fieldExtParams =
-                    ElasticsearchFieldInfo.getFromJson(fieldEntity.getExtParams());
-            esFieldInfo.setScalingFactor(fieldExtParams.getScalingFactor());
-            esFieldInfo.setAnalyzer(fieldExtParams.getAnalyzer());
-            esFieldInfo.setSearchAnalyzer(fieldExtParams.getSearchAnalyzer());
-            esFieldList.add(esFieldInfo);
+            if (StringUtils.isNotBlank(fieldEntity.getExtParams())) {
+                ElasticsearchFieldInfo elasticsearchFieldInfo = ElasticsearchFieldInfo.getFromJson(
+                        fieldEntity.getExtParams());
+                CommonBeanUtils.copyProperties(fieldEntity, elasticsearchFieldInfo, true);
+                esFieldList.add(elasticsearchFieldInfo);
+            } else {
+                ElasticsearchFieldInfo esFieldInfo = new ElasticsearchFieldInfo();
+                CommonBeanUtils.copyProperties(fieldEntity, esFieldInfo, true);
+                esFieldList.add(esFieldInfo);
+            }
         }
         return esFieldList;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index 76bc46da7..c1331bec1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -168,7 +168,8 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
         LOGGER.info("success to update sink field");
     }
 
-    protected void saveFieldOpt(SinkRequest request) {
+    @Override
+    public void saveFieldOpt(SinkRequest request) {
         List<SinkField> fieldList = request.getSinkFieldList();
         LOGGER.info("begin to save sink fields={}", fieldList);
         if (CollectionUtils.isEmpty(fieldList)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
index a2d933595..72d06c5cb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
@@ -92,6 +92,13 @@ public interface StreamSinkOperator {
      */
     void updateFieldOpt(Boolean onlyAdd, SinkRequest request);
 
+    /**
+     * Save the sink fields.
+     *
+     * @param request sink request info needs to save
+     */
+    void saveFieldOpt(SinkRequest request);
+
     /**
      * Delete the sink info.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index ad4df6437..926ba2147 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -18,24 +18,30 @@
 package org.apache.inlong.manager.service.sink.es;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -85,7 +91,7 @@ public class ElasticsearchSinkOperator extends AbstractSinkOperator {
         ElasticsearchSinkDTO dto = ElasticsearchSinkDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, sink, true);
         CommonBeanUtils.copyProperties(dto, sink, true);
-        List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+        List<SinkField> sinkFields = getSinkFields(entity.getId());
         sink.setSinkFieldList(sinkFields);
         return sink;
     }
@@ -101,4 +107,66 @@ public class ElasticsearchSinkOperator extends AbstractSinkOperator {
         return idParams;
     }
 
+    @Override
+    public void saveFieldOpt(SinkRequest request) {
+        List<SinkField> fieldList = request.getSinkFieldList();
+        LOGGER.info("begin to save es sink fields={}", fieldList);
+        if (CollectionUtils.isEmpty(fieldList)) {
+            return;
+        }
+
+        int size = fieldList.size();
+        List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String sinkType = request.getSinkType();
+        Integer sinkId = request.getId();
+        for (SinkField fieldInfo : fieldList) {
+            this.checkFieldInfo(fieldInfo);
+            StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+            if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+                fieldEntity.setFieldComment(fieldEntity.getFieldName());
+            }
+            try {
+                ElasticsearchFieldInfo dto = ElasticsearchFieldInfo.getFromRequest(fieldInfo);
+                fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            } catch (Exception e) {
+                LOGGER.error("parsing json string to sink field info failed", e);
+                throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            }
+            fieldEntity.setInlongGroupId(groupId);
+            fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setSinkType(sinkType);
+            fieldEntity.setSinkId(sinkId);
+            fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
+            entityList.add(fieldEntity);
+        }
+
+        sinkFieldMapper.insertAll(entityList);
+        LOGGER.info("success to save es sink fields");
+    }
+
+    @Override
+    public List<SinkField> getSinkFields(Integer sinkId) {
+        List<StreamSinkFieldEntity> sinkFieldEntities = sinkFieldMapper.selectBySinkId(sinkId);
+        List<SinkField> fieldList = new ArrayList<>();
+        if (CollectionUtils.isEmpty(sinkFieldEntities)) {
+            return fieldList;
+        }
+        sinkFieldEntities.forEach(field -> {
+            SinkField sinkField = new SinkField();
+            if (StringUtils.isNotBlank(field.getExtParams())) {
+                ElasticsearchFieldInfo elasticsearchFieldInfo = ElasticsearchFieldInfo.getFromJson(
+                        field.getExtParams());
+                CommonBeanUtils.copyProperties(field, elasticsearchFieldInfo, true);
+                fieldList.add(elasticsearchFieldInfo);
+            } else {
+                CommonBeanUtils.copyProperties(field, sinkField, true);
+                fieldList.add(sinkField);
+            }
+
+        });
+        return fieldList;
+    }
+
 }