You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/11/29 11:00:04 UTC
(inlong) branch master updated: [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error (#9361)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ff3cf5110a [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error (#9361)
ff3cf5110a is described below
commit ff3cf5110a9eb9d4149ab6d44604a345e5e67dc1
Author: Sting <zp...@connect.ust.hk>
AuthorDate: Wed Nov 29 18:59:57 2023 +0800
[INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error (#9361)
---
.../sort/protocol/node/load/IcebergLoadNode.java | 5 -----
.../java/org/apache/inlong/sort/base/Constants.java | 2 +-
.../sort/iceberg/FlinkDynamicTableFactory.java | 2 ++
.../sink/multiple/DynamicSchemaHandleOperator.java | 20 +++++++++++++++++---
4 files changed, 20 insertions(+), 9 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index a418fd930b..9e3fb9e8dd 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -30,7 +30,6 @@ import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
-import org.apache.inlong.sort.util.SchemaChangeUtils;
import com.google.common.base.Preconditions;
import lombok.Data;
@@ -54,7 +53,6 @@ import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIP
import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_FORMAT;
import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_TABLE_PATTERN;
-import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_SCHEMA_CHANGE_POLICIES;
@JsonTypeName("icebergLoad")
@Data
@@ -177,8 +175,6 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata,
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
options.put(IcebergConstant.CONNECTOR_KEY, IcebergConstant.CONNECTOR);
- // for test sink.ignore.changelog
- // options.put("sink.ignore.changelog", "true");
options.put(IcebergConstant.DATABASE_KEY, dbName);
options.put(IcebergConstant.TABLE_KEY, tableName);
options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
@@ -197,7 +193,6 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata,
options.put(SINK_MULTIPLE_FORMAT, Objects.requireNonNull(sinkMultipleFormat).identifier());
options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern);
options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern);
- options.put(SINK_SCHEMA_CHANGE_POLICIES, SchemaChangeUtils.serialize(policyMap));
} else {
options.put(SINK_MULTIPLE_ENABLE, "false");
}
diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index ac7baac4dd..ba47c74356 100644
--- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -419,7 +419,7 @@ public final class Constants {
public static final ConfigOption<Boolean> SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT =
ConfigOptions.key("sink.multiple.auto-create-table-when-snapshot")
.booleanType()
- .defaultValue(false)
+ .defaultValue(true)
.withDescription("Whether supporting auto create table when snapshot, default value is 'false'");
public static final ConfigOption<String> INNER_FORMAT =
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index bb33e896bd..9a6056e614 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -57,6 +57,7 @@ import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
@@ -327,6 +328,7 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
options.add(SINK_MULTIPLE_PK_AUTO_GENERATED);
options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
+ options.add(SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT);
options.add(WRITE_COMPACT_ENABLE);
options.add(WRITE_COMPACT_INTERVAL);
options.add(WRITE_DISTRIBUTION_MODE);
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index e416cfd24f..1a11b1d771 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -60,6 +60,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +81,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
@@ -522,10 +524,22 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
}
// =============================== Utils method =================================================================
- // if newSchema is not same with oldSchema, return false. It include difference in name, type, position, and
- // quantity
+ // if newSchema is not same with oldSchema, return false.
private boolean isCompatible(Schema newSchema, Schema oldSchema) {
- return oldSchema.sameSchema(newSchema);
+ if (newSchema == null) {
+ return false;
+ }
+
+ List<NestedField> oldSchemaFields = oldSchema.columns();
+ List<NestedField> newSchemaFields = newSchema.columns();
+
+ if (oldSchemaFields.size() != newSchemaFields.size()) {
+ return false;
+ }
+
+ return IntStream.range(0, oldSchemaFields.size())
+ .allMatch(i -> oldSchemaFields.get(i).name().equals(newSchemaFields.get(i).name())
+ && oldSchemaFields.get(i).type() == newSchemaFields.get(i).type());
}
private TableIdentifier parseId(JsonNode data) throws IOException {