You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/01 09:01:51 UTC

[incubator-inlong] branch master updated: [INLONG-4473][Manager] Fix DuplicateKeyException when save StreamSinkField (#4475)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b061ac34 [INLONG-4473][Manager] Fix DuplicateKeyException when save StreamSinkField (#4475)
5b061ac34 is described below

commit 5b061ac3433ea0f495707bd2b27d33646308ee14
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Jun 1 17:01:45 2022 +0800

    [INLONG-4473][Manager] Fix DuplicateKeyException when save StreamSinkField (#4475)
---
 .../manager/service/core/impl/InlongStreamServiceImpl.java    |  1 +
 .../inlong/manager/service/sink/StreamSinkServiceImpl.java    | 11 +++++++++++
 .../inlong/manager/service/sort/util/FieldInfoUtils.java      |  2 +-
 .../manager/service/source/StreamSourceServiceImpl.java       | 11 +++++++++++
 4 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 03da93075..5a5c93abe 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -605,6 +605,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         if (CollectionUtils.isEmpty(infoList)) {
             return;
         }
+        infoList.stream().forEach(streamField -> streamField.setId(null));
         List<InlongStreamFieldEntity> list = CommonBeanUtils.copyListProperties(infoList,
                 InlongStreamFieldEntity::new);
         for (InlongStreamFieldEntity entity : list) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index c99c94b3b..026461985 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.sink.SinkApproveDTO;
 import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
@@ -95,6 +96,11 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
         // According to the sink type, save sink information
         StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
+        List<SinkField> fields = request.getFieldList();
+        // Remove id in sinkField when save
+        if (CollectionUtils.isNotEmpty(fields)) {
+            fields.stream().forEach(sinkField -> sinkField.setId(null));
+        }
         int id = operation.saveOpt(request, operator);
 
         LOGGER.info("success to save sink info: {}", request);
@@ -197,6 +203,11 @@ public class StreamSinkServiceImpl implements StreamSinkService {
                 throw new BusinessException(String.format(err, sinkName, groupId, streamId));
             }
         }
+        List<SinkField> fields = request.getFieldList();
+        // Remove id in sinkField when save
+        if (CollectionUtils.isNotEmpty(fields)) {
+            fields.stream().forEach(sinkField -> sinkField.setId(null));
+        }
 
         StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
         operation.updateOpt(request, operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
index e9859592a..f29557a64 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
@@ -119,7 +119,7 @@ public class FieldInfoUtils {
             // TODO The meta field needs to be selectable and cannot be filled in by the user
             return new MetaFieldInfo(fieldName, MetaField.forName(metaFieldName));
         } else {
-            return new FieldInfo(fieldName, convertFieldFormat(fieldType.toLowerCase(), format));
+            return new FieldInfo(fieldName, convertFieldFormat(fieldType, format));
         }
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 8cc655ed4..b3993d071 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -33,6 +33,7 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -84,6 +85,11 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         // According to the source type, save source information
         String sourceType = request.getSourceType();
         StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
+        // Remove id in sourceField when save
+        List<StreamField> streamFields = request.getFieldList();
+        if (CollectionUtils.isNotEmpty(streamFields)) {
+            streamFields.stream().forEach(streamField -> streamField.setId(null));
+        }
         int id = operation.saveOpt(request, groupEntity.getStatus(), operator);
 
         LOGGER.info("success to save source info: {}", request);
@@ -167,6 +173,11 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
         String sourceType = request.getSourceType();
         StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
+        // Remove id in sourceField when save
+        List<StreamField> streamFields = request.getFieldList();
+        if (CollectionUtils.isNotEmpty(streamFields)) {
+            streamFields.stream().forEach(streamField -> streamField.setId(null));
+        }
         operation.updateOpt(request, groupEntity.getStatus(), operator);
 
         LOGGER.info("success to update source info: {}", request);