You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/11/29 11:00:04 UTC

(inlong) branch master updated: [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error (#9361)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ff3cf5110a [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error (#9361)
ff3cf5110a is described below

commit ff3cf5110a9eb9d4149ab6d44604a345e5e67dc1
Author: Sting <zp...@connect.ust.hk>
AuthorDate: Wed Nov 29 18:59:57 2023 +0800

    [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error (#9361)
---
 .../sort/protocol/node/load/IcebergLoadNode.java     |  5 -----
 .../java/org/apache/inlong/sort/base/Constants.java  |  2 +-
 .../sort/iceberg/FlinkDynamicTableFactory.java       |  2 ++
 .../sink/multiple/DynamicSchemaHandleOperator.java   | 20 +++++++++++++++++---
 4 files changed, 20 insertions(+), 9 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index a418fd930b..9e3fb9e8dd 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -30,7 +30,6 @@ import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
-import org.apache.inlong.sort.util.SchemaChangeUtils;
 
 import com.google.common.base.Preconditions;
 import lombok.Data;
@@ -54,7 +53,6 @@ import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIP
 import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_FORMAT;
 import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_TABLE_PATTERN;
-import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_SCHEMA_CHANGE_POLICIES;
 
 @JsonTypeName("icebergLoad")
 @Data
@@ -177,8 +175,6 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata,
     public Map<String, String> tableOptions() {
         Map<String, String> options = super.tableOptions();
         options.put(IcebergConstant.CONNECTOR_KEY, IcebergConstant.CONNECTOR);
-        // for test sink.ignore.changelog
-        // options.put("sink.ignore.changelog", "true");
         options.put(IcebergConstant.DATABASE_KEY, dbName);
         options.put(IcebergConstant.TABLE_KEY, tableName);
         options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
@@ -197,7 +193,6 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata,
             options.put(SINK_MULTIPLE_FORMAT, Objects.requireNonNull(sinkMultipleFormat).identifier());
             options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern);
             options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern);
-            options.put(SINK_SCHEMA_CHANGE_POLICIES, SchemaChangeUtils.serialize(policyMap));
         } else {
             options.put(SINK_MULTIPLE_ENABLE, "false");
         }
diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index ac7baac4dd..ba47c74356 100644
--- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -419,7 +419,7 @@ public final class Constants {
     public static final ConfigOption<Boolean> SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT =
             ConfigOptions.key("sink.multiple.auto-create-table-when-snapshot")
                     .booleanType()
-                    .defaultValue(false)
+                    .defaultValue(true)
                     .withDescription("Whether supporting auto create table when snapshot, default value is 'false'");
 
     public static final ConfigOption<String> INNER_FORMAT =
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index bb33e896bd..9a6056e614 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -57,6 +57,7 @@ import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
@@ -327,6 +328,7 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
         options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
         options.add(SINK_MULTIPLE_PK_AUTO_GENERATED);
         options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
+        options.add(SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT);
         options.add(WRITE_COMPACT_ENABLE);
         options.add(WRITE_COMPACT_INTERVAL);
         options.add(WRITE_DISTRIBUTION_MODE);
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index e416cfd24f..1a11b1d771 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -60,6 +60,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,6 +81,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
 
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
@@ -522,10 +524,22 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
     }
 
     // =============================== Utils method =================================================================
-    // if newSchema is not same with oldSchema, return false. It include difference in name, type, position, and
-    // quantity
+    // if newSchema is not same with oldSchema, return false.
     private boolean isCompatible(Schema newSchema, Schema oldSchema) {
-        return oldSchema.sameSchema(newSchema);
+        if (newSchema == null) {
+            return false;
+        }
+
+        List<NestedField> oldSchemaFields = oldSchema.columns();
+        List<NestedField> newSchemaFields = newSchema.columns();
+
+        if (oldSchemaFields.size() != newSchemaFields.size()) {
+            return false;
+        }
+
+        return IntStream.range(0, oldSchemaFields.size())
+                .allMatch(i -> oldSchemaFields.get(i).name().equals(newSchemaFields.get(i).name())
+                        && oldSchemaFields.get(i).type() == newSchemaFields.get(i).type());
     }
 
     private TableIdentifier parseId(JsonNode data) throws IOException {