You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/01 13:43:54 UTC
[incubator-inlong] branch master updated: [INLONG-4060][Manager] Fix NodeUtils for sort (#4061)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3ff161e24 [INLONG-4060][Manager] Fix NodeUtils for sort (#4061)
3ff161e24 is described below
commit 3ff161e2478f4bb754acc9f8d6893e6974d346bb
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sun May 1 21:43:49 2022 +0800
[INLONG-4060][Manager] Fix NodeUtils for sort (#4061)
---
.../service/sort/util/ExtractNodeUtils.java | 13 +++++++----
.../manager/service/sort/util/LoadNodeUtils.java | 26 ++++++++++++----------
2 files changed, 23 insertions(+), 16 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index ab5d1a0e3..aede87357 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
@@ -92,12 +93,12 @@ public class ExtractNodeUtils {
serverId = binlogSourceResponse.getServerId();
}
String tables = binlogSourceResponse.getTableWhiteList();
- List<String> tableNames = Splitter.on(",").splitToList(tables);
- List<InlongStreamFieldInfo> streamFieldInfos = binlogSourceResponse.getFieldList();
- List<FieldInfo> fieldInfos = streamFieldInfos.stream()
+ final List<String> tableNames = Splitter.on(",").splitToList(tables);
+ final List<InlongStreamFieldInfo> streamFieldInfos = binlogSourceResponse.getFieldList();
+ final List<FieldInfo> fieldInfos = streamFieldInfos.stream()
.map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
.collect(Collectors.toList());
- String serverTimeZone = binlogSourceResponse.getServerTimezone();
+ final String serverTimeZone = binlogSourceResponse.getServerTimezone();
boolean incrementalSnapshotEnabled = true;
// TODO Needs to be configurable for those parameters
@@ -108,6 +109,10 @@ public class ExtractNodeUtils {
properties.put("migrate-all", "true");
}
properties.put("append-mode", "true");
+ if (StringUtils.isEmpty(primaryKey)) {
+ incrementalSnapshotEnabled = false;
+ properties.put("scan.incremental.snapshot.enabled", "false");
+ }
return new MySqlExtractNode(id,
name,
fieldInfos,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 4e1080b8b..1ebd697ad 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -162,17 +162,19 @@ public class LoadNodeUtils {
if (CollectionUtils.isEmpty(sinkFieldResponses)) {
return Lists.newArrayList();
}
- return sinkFieldResponses.stream().map(sinkFieldResponse -> {
- String fieldName = sinkFieldResponse.getFieldName();
- String fieldType = sinkFieldResponse.getFieldType();
- String fieldFormat = sinkFieldResponse.getFieldFormat();
- FieldInfo sinkField = new FieldInfo(fieldName, sinkName,
- FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat));
- String sourceFieldName = sinkFieldResponse.getSourceFieldName();
- String sourceFieldType = sinkFieldResponse.getSourceFieldType();
- FieldInfo sourceField = new FieldInfo(sourceFieldName, sinkName,
- FieldInfoUtils.convertFieldFormat(sourceFieldType));
- return new FieldRelationShip(sourceField, sinkField);
- }).collect(Collectors.toList());
+ return sinkFieldResponses.stream()
+ .filter(sinkFieldResponse -> StringUtils.isNotEmpty(sinkFieldResponse.getSourceFieldName()))
+ .map(sinkFieldResponse -> {
+ String fieldName = sinkFieldResponse.getFieldName();
+ String fieldType = sinkFieldResponse.getFieldType();
+ String fieldFormat = sinkFieldResponse.getFieldFormat();
+ FieldInfo sinkField = new FieldInfo(fieldName, sinkName,
+ FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat));
+ String sourceFieldName = sinkFieldResponse.getSourceFieldName();
+ String sourceFieldType = sinkFieldResponse.getSourceFieldType();
+ FieldInfo sourceField = new FieldInfo(sourceFieldName, sinkName,
+ FieldInfoUtils.convertFieldFormat(sourceFieldType));
+ return new FieldRelationShip(sourceField, sinkField);
+ }).collect(Collectors.toList());
}
}