You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/01 13:43:54 UTC

[incubator-inlong] branch master updated: [INLONG-4060][Manager] Fix NodeUtils for sort (#4061)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ff161e24 [INLONG-4060][Manager] Fix NodeUtils for sort (#4061)
3ff161e24 is described below

commit 3ff161e2478f4bb754acc9f8d6893e6974d346bb
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sun May 1 21:43:49 2022 +0800

    [INLONG-4060][Manager] Fix NodeUtils for sort (#4061)
---
 .../service/sort/util/ExtractNodeUtils.java        | 13 +++++++----
 .../manager/service/sort/util/LoadNodeUtils.java   | 26 ++++++++++++----------
 2 files changed, 23 insertions(+), 16 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index ab5d1a0e3..aede87357 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
@@ -92,12 +93,12 @@ public class ExtractNodeUtils {
             serverId = binlogSourceResponse.getServerId();
         }
         String tables = binlogSourceResponse.getTableWhiteList();
-        List<String> tableNames = Splitter.on(",").splitToList(tables);
-        List<InlongStreamFieldInfo> streamFieldInfos = binlogSourceResponse.getFieldList();
-        List<FieldInfo> fieldInfos = streamFieldInfos.stream()
+        final List<String> tableNames = Splitter.on(",").splitToList(tables);
+        final List<InlongStreamFieldInfo> streamFieldInfos = binlogSourceResponse.getFieldList();
+        final List<FieldInfo> fieldInfos = streamFieldInfos.stream()
                 .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
                 .collect(Collectors.toList());
-        String serverTimeZone = binlogSourceResponse.getServerTimezone();
+        final String serverTimeZone = binlogSourceResponse.getServerTimezone();
         boolean incrementalSnapshotEnabled = true;
         
         // TODO Needs to be configurable for those parameters
@@ -108,6 +109,10 @@ public class ExtractNodeUtils {
             properties.put("migrate-all", "true");
         }
         properties.put("append-mode", "true");
+        if (StringUtils.isEmpty(primaryKey)) {
+            incrementalSnapshotEnabled = false;
+            properties.put("scan.incremental.snapshot.enabled", "false");
+        }
         return new MySqlExtractNode(id,
                 name,
                 fieldInfos,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 4e1080b8b..1ebd697ad 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -162,17 +162,19 @@ public class LoadNodeUtils {
         if (CollectionUtils.isEmpty(sinkFieldResponses)) {
             return Lists.newArrayList();
         }
-        return sinkFieldResponses.stream().map(sinkFieldResponse -> {
-            String fieldName = sinkFieldResponse.getFieldName();
-            String fieldType = sinkFieldResponse.getFieldType();
-            String fieldFormat = sinkFieldResponse.getFieldFormat();
-            FieldInfo sinkField = new FieldInfo(fieldName, sinkName,
-                    FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat));
-            String sourceFieldName = sinkFieldResponse.getSourceFieldName();
-            String sourceFieldType = sinkFieldResponse.getSourceFieldType();
-            FieldInfo sourceField = new FieldInfo(sourceFieldName, sinkName,
-                    FieldInfoUtils.convertFieldFormat(sourceFieldType));
-            return new FieldRelationShip(sourceField, sinkField);
-        }).collect(Collectors.toList());
+        return sinkFieldResponses.stream()
+                .filter(sinkFieldResponse -> StringUtils.isNotEmpty(sinkFieldResponse.getSourceFieldName()))
+                .map(sinkFieldResponse -> {
+                    String fieldName = sinkFieldResponse.getFieldName();
+                    String fieldType = sinkFieldResponse.getFieldType();
+                    String fieldFormat = sinkFieldResponse.getFieldFormat();
+                    FieldInfo sinkField = new FieldInfo(fieldName, sinkName,
+                            FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat));
+                    String sourceFieldName = sinkFieldResponse.getSourceFieldName();
+                    String sourceFieldType = sinkFieldResponse.getSourceFieldType();
+                    FieldInfo sourceField = new FieldInfo(sourceFieldName, sinkName,
+                            FieldInfoUtils.convertFieldFormat(sourceFieldType));
+                    return new FieldRelationShip(sourceField, sinkField);
+                }).collect(Collectors.toList());
     }
 }