You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "healchow (via GitHub)" <gi...@apache.org> on 2023/03/21 10:01:51 UTC

[GitHub] [inlong] healchow commented on a diff in pull request #7664: [INLONG-7663][Manager] Support agent report new fields and automatically issue tasks

healchow commented on code in PR #7664:
URL: https://github.com/apache/inlong/pull/7664#discussion_r1143112386


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java:
##########
@@ -695,6 +704,57 @@ public List<SinkField> parseFields(String fieldsJson) {
         }
     }
 
+    @Override
+    public void addFieldForSink(AddFieldsRequest fieldsRequest, String sourceType, InlongGroupEntity groupEntity,
+            InlongStreamEntity streamEntity) {
+        AtomicBoolean isNeedAddField = new AtomicBoolean(false);

Review Comment:
   There is no need to use `AtomicBoolean` here because a local variable will not be a thread-safe problem.



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java:
##########
@@ -695,6 +704,57 @@ public List<SinkField> parseFields(String fieldsJson) {
         }
     }
 
+    @Override
+    public void addFieldForSink(AddFieldsRequest fieldsRequest, String sourceType, InlongGroupEntity groupEntity,
+            InlongStreamEntity streamEntity) {
+        AtomicBoolean isNeedAddField = new AtomicBoolean(false);
+        String groupId = groupEntity.getInlongGroupId();
+        String streamId = streamEntity.getInlongStreamId();
+        String defaultOperator = groupEntity.getInCharges().split(InlongConstants.COMMA)[0];
+        // add fields for StreamSinkField
+        List<StreamSinkEntity> sinkEntityList = sinkMapper.selectByRelatedId(groupId, streamId);
+        sinkEntityList.forEach(sink -> {
+            String sinkType = sink.getSinkType();
+            List<SinkField> toAddFields = fieldsRequest.getFields().stream().map(streamField -> {
+                SinkField sinkField = new SinkField();
+                sinkField.setSinkType(sink.getSinkType());
+                sinkField.setFieldName(streamField.getFieldName());
+                sinkField.setFieldType(fieldTypeUtils.getSinkField(sinkType, streamField.getFieldType()));
+                sinkField.setFieldComment(streamField.getFieldComment());
+                sinkField.setSourceFieldName(streamField.getFieldName());
+                sinkField.setSourceFieldType(fieldTypeUtils.getStreamField(sourceType, streamField.getFieldType()));
+                return sinkField;
+            }).collect(Collectors.toList());
+            List<StreamSinkFieldEntity> existsFieldList = sinkFieldMapper.selectBySinkId(sink.getId());
+            List<SinkField> sinkFields = new ArrayList<>();
+            if (CollectionUtils.isNotEmpty(existsFieldList)) {
+                sinkFields = CommonBeanUtils.copyListProperties(existsFieldList, SinkField::new);
+            }
+            Set<String> existsNames = sinkFields.stream()
+                    .map(field -> field.getFieldName().toLowerCase(Locale.ROOT))
+                    .collect(Collectors.toSet());
+            for (SinkField fieldInfo : toAddFields) {
+                String tobeAddFieldName = fieldInfo.getFieldName().toLowerCase(Locale.ROOT);
+                if (existsNames.contains(tobeAddFieldName)) {
+                    LOGGER.error("sink field {} already exist for sinkId {}", fieldInfo.getFieldName(), sink.getId());
+                } else {
+                    sinkFields.add(fieldInfo);
+                }
+            }
+            if (sinkFields.size() != existsFieldList.size()) {
+                isNeedAddField.set(true);
+                StreamSink streamSink = this.get(sink.getId());
+                SinkRequest request = streamSink.genSinkRequest();
+                request.setSinkFieldList(sinkFields);
+                this.update(request, defaultOperator);
+            }
+        });
+        boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());

Review Comment:
   This method is too compact, it is recommended to add a blank line between the codes of different functions to improve readability.



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java:
##########
@@ -695,6 +704,57 @@ public List<SinkField> parseFields(String fieldsJson) {
         }
     }
 
+    @Override
+    public void addFieldForSink(AddFieldsRequest fieldsRequest, String sourceType, InlongGroupEntity groupEntity,
+            InlongStreamEntity streamEntity) {
+        AtomicBoolean isNeedAddField = new AtomicBoolean(false);
+        String groupId = groupEntity.getInlongGroupId();
+        String streamId = streamEntity.getInlongStreamId();
+        String defaultOperator = groupEntity.getInCharges().split(InlongConstants.COMMA)[0];
+        // add fields for StreamSinkField
+        List<StreamSinkEntity> sinkEntityList = sinkMapper.selectByRelatedId(groupId, streamId);
+        sinkEntityList.forEach(sink -> {
+            String sinkType = sink.getSinkType();
+            List<SinkField> toAddFields = fieldsRequest.getFields().stream().map(streamField -> {
+                SinkField sinkField = new SinkField();
+                sinkField.setSinkType(sink.getSinkType());
+                sinkField.setFieldName(streamField.getFieldName());
+                sinkField.setFieldType(fieldTypeUtils.getSinkField(sinkType, streamField.getFieldType()));
+                sinkField.setFieldComment(streamField.getFieldComment());
+                sinkField.setSourceFieldName(streamField.getFieldName());
+                sinkField.setSourceFieldType(fieldTypeUtils.getStreamField(sourceType, streamField.getFieldType()));
+                return sinkField;
+            }).collect(Collectors.toList());
+            List<StreamSinkFieldEntity> existsFieldList = sinkFieldMapper.selectBySinkId(sink.getId());
+            List<SinkField> sinkFields = new ArrayList<>();
+            if (CollectionUtils.isNotEmpty(existsFieldList)) {
+                sinkFields = CommonBeanUtils.copyListProperties(existsFieldList, SinkField::new);

Review Comment:
   Why make a copy? There may be tens or hundreds of fields, and the performance of this operation is not high.



##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldTypeUtils.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.util;
+
+import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.FieldTypeMapperInfo;
+import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Util for field type convert.
+ */
+@Data
+@Component
+@ConfigurationProperties(prefix = "type-mapper")
+public class FieldTypeUtils {

Review Comment:
   In my opinion, this tool class should not be instantiated, nor should it be handed over to Spring for management.
   
   This tool class should read the mapping information in the configuration file when it is used. As long as the configuration file is updated, the latest configuration can be read when it is used next time without restarting the Manager service.



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java:
##########
@@ -695,6 +704,57 @@ public List<SinkField> parseFields(String fieldsJson) {
         }
     }
 
+    @Override
+    public void addFieldForSink(AddFieldsRequest fieldsRequest, String sourceType, InlongGroupEntity groupEntity,
+            InlongStreamEntity streamEntity) {
+        AtomicBoolean isNeedAddField = new AtomicBoolean(false);
+        String groupId = groupEntity.getInlongGroupId();
+        String streamId = streamEntity.getInlongStreamId();
+        String defaultOperator = groupEntity.getInCharges().split(InlongConstants.COMMA)[0];
+        // add fields for StreamSinkField
+        List<StreamSinkEntity> sinkEntityList = sinkMapper.selectByRelatedId(groupId, streamId);
+        sinkEntityList.forEach(sink -> {
+            String sinkType = sink.getSinkType();
+            List<SinkField> toAddFields = fieldsRequest.getFields().stream().map(streamField -> {
+                SinkField sinkField = new SinkField();
+                sinkField.setSinkType(sink.getSinkType());
+                sinkField.setFieldName(streamField.getFieldName());
+                sinkField.setFieldType(fieldTypeUtils.getSinkField(sinkType, streamField.getFieldType()));
+                sinkField.setFieldComment(streamField.getFieldComment());
+                sinkField.setSourceFieldName(streamField.getFieldName());
+                sinkField.setSourceFieldType(fieldTypeUtils.getStreamField(sourceType, streamField.getFieldType()));
+                return sinkField;
+            }).collect(Collectors.toList());
+            List<StreamSinkFieldEntity> existsFieldList = sinkFieldMapper.selectBySinkId(sink.getId());
+            List<SinkField> sinkFields = new ArrayList<>();
+            if (CollectionUtils.isNotEmpty(existsFieldList)) {
+                sinkFields = CommonBeanUtils.copyListProperties(existsFieldList, SinkField::new);
+            }
+            Set<String> existsNames = sinkFields.stream()
+                    .map(field -> field.getFieldName().toLowerCase(Locale.ROOT))
+                    .collect(Collectors.toSet());
+            for (SinkField fieldInfo : toAddFields) {
+                String tobeAddFieldName = fieldInfo.getFieldName().toLowerCase(Locale.ROOT);
+                if (existsNames.contains(tobeAddFieldName)) {
+                    LOGGER.error("sink field {} already exist for sinkId {}", fieldInfo.getFieldName(), sink.getId());

Review Comment:
   Do we need to throw an exception here, and not do the following operations?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org