You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/29 11:14:10 UTC
[inlong] branch master updated: [INLONG-6665][Manager] Support to save additional info for the Elasticsearch field (#6666)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 470269eb3 [INLONG-6665][Manager] Support to save additional info for the Elasticsearch field (#6666)
470269eb3 is described below
commit 470269eb36dade56c8cff5453cffd1d3c16e64b5
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Tue Nov 29 19:14:04 2022 +0800
[INLONG-6665][Manager] Support to save additional info for the Elasticsearch field (#6666)
---
.../pojo/sink/es/ElasticsearchFieldInfo.java | 25 +++----
.../service/resource/sink/es/ElasticsearchApi.java | 16 ++---
.../sink/es/ElasticsearchResourceOperator.java | 27 ++++----
.../manager/service/sink/AbstractSinkOperator.java | 3 +-
.../manager/service/sink/StreamSinkOperator.java | 7 ++
.../service/sink/es/ElasticsearchSinkOperator.java | 78 ++++++++++++++++++++--
6 files changed, 117 insertions(+), 39 deletions(-)
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchFieldInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchFieldInfo.java
index 82a280a41..9c952550c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchFieldInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchFieldInfo.java
@@ -19,30 +19,24 @@ package org.apache.inlong.manager.pojo.sink.es;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.SinkField;
import javax.validation.constraints.NotNull;
@Data
-@Builder
@NoArgsConstructor
@AllArgsConstructor
-public class ElasticsearchFieldInfo {
-
- @ApiModelProperty("Elasticsearch Field name")
- private String name;
-
- @ApiModelProperty("Elasticsearch Field type")
- private String type;
-
- @ApiModelProperty("Elasticsearch Field text format")
- private String format;
+@JsonTypeDefine(value = SinkType.ELASTICSEARCH)
+public class ElasticsearchFieldInfo extends SinkField {
@ApiModelProperty("Elasticsearch Analyzer")
private String analyzer;
@@ -53,6 +47,13 @@ public class ElasticsearchFieldInfo {
@ApiModelProperty("Elasticsearch Scaling Factor")
private String scalingFactor;
+ /**
+ * Get the dto instance from the request
+ */
+ public static ElasticsearchFieldInfo getFromRequest(SinkField sinkField) {
+ return CommonBeanUtils.copyProperties(sinkField, ElasticsearchFieldInfo::new, true);
+ }
+
/**
* Get the extra param from the Json
*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
index 6a5ebafe1..41309e6de 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
@@ -116,10 +116,10 @@ public class ElasticsearchApi {
private List<String> getMappingInfo(List<ElasticsearchFieldInfo> fieldsInfo) {
List<String> fieldList = new ArrayList<>();
for (ElasticsearchFieldInfo field : fieldsInfo) {
- StringBuilder fieldStr = new StringBuilder().append(" \"").append(field.getName())
+ StringBuilder fieldStr = new StringBuilder().append(" \"").append(field.getFieldName())
.append("\" : {\n \"type\" : \"")
- .append(field.getType()).append("\"");
- if (field.getType().equals("text")) {
+ .append(field.getFieldType()).append("\"");
+ if (field.getFieldType().equals("text")) {
if (StringUtils.isNotEmpty(field.getAnalyzer())) {
fieldStr.append(",\n \"analyzer\" : \"")
.append(field.getAnalyzer()).append("\"");
@@ -128,12 +128,12 @@ public class ElasticsearchApi {
fieldStr.append(",\n \"search_analyzer\" : \"")
.append(field.getSearchAnalyzer()).append("\"");
}
- } else if (field.getType().equals("date")) {
- if (StringUtils.isNotEmpty(field.getFormat())) {
+ } else if (field.getFieldType().equals("date")) {
+ if (StringUtils.isNotEmpty(field.getFieldFormat())) {
fieldStr.append(",\n \"format\" : \"")
- .append(field.getFormat()).append("\"");
+ .append(field.getFieldFormat()).append("\"");
}
- } else if (field.getType().equals("scaled_float")) {
+ } else if (field.getFieldType().equals("scaled_float")) {
if (StringUtils.isNotEmpty(field.getScalingFactor())) {
fieldStr.append(",\n \"scaling_factor\" : \"")
.append(field.getScalingFactor()).append("\"");
@@ -213,7 +213,7 @@ public class ElasticsearchApi {
Map<String, Object> filedMap = (Map<String, Object>) mapping.get(indexName).getSourceAsMap().get(FIELD_KEY);
for (String key : filedMap.keySet()) {
for (ElasticsearchFieldInfo field : notExistFieldInfos) {
- if (field.getName().equals(key)) {
+ if (field.getFieldName().equals(key)) {
notExistFieldInfos.remove(field);
break;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
index 7d318eaee..c66af6951 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
@@ -20,14 +20,15 @@ package org.apache.inlong.manager.service.resource.sink.es;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
-import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
-import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.slf4j.Logger;
@@ -124,16 +125,16 @@ public class ElasticsearchResourceOperator implements SinkResourceOperator {
public List<ElasticsearchFieldInfo> getElasticsearchFieldFromSink(List<StreamSinkFieldEntity> sinkList) {
List<ElasticsearchFieldInfo> esFieldList = new ArrayList<>();
for (StreamSinkFieldEntity fieldEntity : sinkList) {
- ElasticsearchFieldInfo esFieldInfo = new ElasticsearchFieldInfo();
- esFieldInfo.setName(fieldEntity.getFieldName());
- esFieldInfo.setType(fieldEntity.getFieldType());
- esFieldInfo.setFormat(fieldEntity.getFieldFormat());
- ElasticsearchFieldInfo fieldExtParams =
- ElasticsearchFieldInfo.getFromJson(fieldEntity.getExtParams());
- esFieldInfo.setScalingFactor(fieldExtParams.getScalingFactor());
- esFieldInfo.setAnalyzer(fieldExtParams.getAnalyzer());
- esFieldInfo.setSearchAnalyzer(fieldExtParams.getSearchAnalyzer());
- esFieldList.add(esFieldInfo);
+ if (StringUtils.isNotBlank(fieldEntity.getExtParams())) {
+ ElasticsearchFieldInfo elasticsearchFieldInfo = ElasticsearchFieldInfo.getFromJson(
+ fieldEntity.getExtParams());
+ CommonBeanUtils.copyProperties(fieldEntity, elasticsearchFieldInfo, true);
+ esFieldList.add(elasticsearchFieldInfo);
+ } else {
+ ElasticsearchFieldInfo esFieldInfo = new ElasticsearchFieldInfo();
+ CommonBeanUtils.copyProperties(fieldEntity, esFieldInfo, true);
+ esFieldList.add(esFieldInfo);
+ }
}
return esFieldList;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index 76bc46da7..c1331bec1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -168,7 +168,8 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
LOGGER.info("success to update sink field");
}
- protected void saveFieldOpt(SinkRequest request) {
+ @Override
+ public void saveFieldOpt(SinkRequest request) {
List<SinkField> fieldList = request.getSinkFieldList();
LOGGER.info("begin to save sink fields={}", fieldList);
if (CollectionUtils.isEmpty(fieldList)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
index a2d933595..72d06c5cb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
@@ -92,6 +92,13 @@ public interface StreamSinkOperator {
*/
void updateFieldOpt(Boolean onlyAdd, SinkRequest request);
+ /**
+ * Save the sink fields.
+ *
+ * @param request sink request info needs to save
+ */
+ void saveFieldOpt(SinkRequest request);
+
/**
* Delete the sink info.
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index ad4df6437..926ba2147 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -18,24 +18,30 @@
package org.apache.inlong.manager.service.sink.es;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -85,7 +91,7 @@ public class ElasticsearchSinkOperator extends AbstractSinkOperator {
ElasticsearchSinkDTO dto = ElasticsearchSinkDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, sink, true);
CommonBeanUtils.copyProperties(dto, sink, true);
- List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+ List<SinkField> sinkFields = getSinkFields(entity.getId());
sink.setSinkFieldList(sinkFields);
return sink;
}
@@ -101,4 +107,66 @@ public class ElasticsearchSinkOperator extends AbstractSinkOperator {
return idParams;
}
+ @Override
+ public void saveFieldOpt(SinkRequest request) {
+ List<SinkField> fieldList = request.getSinkFieldList();
+ LOGGER.info("begin to save es sink fields={}", fieldList);
+ if (CollectionUtils.isEmpty(fieldList)) {
+ return;
+ }
+
+ int size = fieldList.size();
+ List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
+ String sinkType = request.getSinkType();
+ Integer sinkId = request.getId();
+ for (SinkField fieldInfo : fieldList) {
+ this.checkFieldInfo(fieldInfo);
+ StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+ if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+ fieldEntity.setFieldComment(fieldEntity.getFieldName());
+ }
+ try {
+ ElasticsearchFieldInfo dto = ElasticsearchFieldInfo.getFromRequest(fieldInfo);
+ fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ LOGGER.error("parsing json string to sink field info failed", e);
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ }
+ fieldEntity.setInlongGroupId(groupId);
+ fieldEntity.setInlongStreamId(streamId);
+ fieldEntity.setSinkType(sinkType);
+ fieldEntity.setSinkId(sinkId);
+ fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
+ entityList.add(fieldEntity);
+ }
+
+ sinkFieldMapper.insertAll(entityList);
+ LOGGER.info("success to save es sink fields");
+ }
+
+ @Override
+ public List<SinkField> getSinkFields(Integer sinkId) {
+ List<StreamSinkFieldEntity> sinkFieldEntities = sinkFieldMapper.selectBySinkId(sinkId);
+ List<SinkField> fieldList = new ArrayList<>();
+ if (CollectionUtils.isEmpty(sinkFieldEntities)) {
+ return fieldList;
+ }
+ sinkFieldEntities.forEach(field -> {
+ SinkField sinkField = new SinkField();
+ if (StringUtils.isNotBlank(field.getExtParams())) {
+ ElasticsearchFieldInfo elasticsearchFieldInfo = ElasticsearchFieldInfo.getFromJson(
+ field.getExtParams());
+ CommonBeanUtils.copyProperties(field, elasticsearchFieldInfo, true);
+ fieldList.add(elasticsearchFieldInfo);
+ } else {
+ CommonBeanUtils.copyProperties(field, sinkField, true);
+ fieldList.add(sinkField);
+ }
+
+ });
+ return fieldList;
+ }
+
}