You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/11/29 09:10:24 UTC

[GitHub] [inlong] fuweng11 opened a new pull request, #6666: [INLONG-6665][Manager] Support to save additional information for Elasticsearch field

fuweng11 opened a new pull request, #6666:
URL: https://github.com/apache/inlong/pull/6666

   
   ### Prepare a Pull Request
   
   - Fixes #6665
   
   ### Motivation
   
   Support to save additional information for Elasticsearch field
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow merged pull request #6666: [INLONG-6665][Manager] Support to save additional information for Elasticsearch field

Posted by GitBox <gi...@apache.org>.
healchow merged PR #6666:
URL: https://github.com/apache/inlong/pull/6666


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6666: [INLONG-6665][Manager] Support to save additional information for Elasticsearch field

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6666:
URL: https://github.com/apache/inlong/pull/6666#discussion_r1034500966


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java:
##########
@@ -101,4 +107,66 @@ public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<Stri
         return idParams;
     }
 
+    @Override
+    public void saveFieldOpt(SinkRequest request) {
+        List<SinkField> fieldList = request.getSinkFieldList();
+        LOGGER.info("begin to save sink fields={}", fieldList);
+        if (CollectionUtils.isEmpty(fieldList)) {
+            return;
+        }
+
+        int size = fieldList.size();
+        List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String sinkType = request.getSinkType();
+        Integer sinkId = request.getId();
+        for (SinkField fieldInfo : fieldList) {
+            this.checkFieldInfo(fieldInfo);
+            StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+            if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+                fieldEntity.setFieldComment(fieldEntity.getFieldName());
+            }
+            try {
+                ElasticsearchFieldInfo dto = ElasticsearchFieldInfo.getFromRequest(fieldInfo);
+                fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            } catch (Exception e) {
+                LOGGER.error("parsing json string to sink field info failed", e);
+                throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            }
+            fieldEntity.setInlongGroupId(groupId);
+            fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setSinkType(sinkType);
+            fieldEntity.setSinkId(sinkId);
+            fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
+            entityList.add(fieldEntity);
+        }
+
+        sinkFieldMapper.insertAll(entityList);
+        LOGGER.info("success to save sink fields");
+    }
+
+    @Override
+    public List<SinkField> getSinkFields(Integer sinkId) {
+        List<StreamSinkFieldEntity> sinkFieldEntities = sinkFieldMapper.selectBySinkId(sinkId);
+        List<SinkField> fieldList = new ArrayList<>();
+        if (CollectionUtils.isEmpty(sinkFieldEntities)) {
+            return fieldList;
+        }
+        sinkFieldEntities.stream().forEach(field -> {

Review Comment:
   Maybe `sinkFieldEntities.forEach()` is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6666: [INLONG-6665][Manager] Support to save additional information for Elasticsearch field

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6666:
URL: https://github.com/apache/inlong/pull/6666#discussion_r1034501810


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java:
##########
@@ -101,4 +107,66 @@ public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<Stri
         return idParams;
     }
 
+    @Override
+    public void saveFieldOpt(SinkRequest request) {
+        List<SinkField> fieldList = request.getSinkFieldList();
+        LOGGER.info("begin to save sink fields={}", fieldList);

Review Comment:
   Whether it needs to add "es" to the log?
   
   ```
   begin to save es sink fields xxx
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org