You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/10/25 08:23:11 UTC
[inlong] branch master updated: [INLONG-6274][Sort] Support multiple sink for IcebergLoadNode (#6215)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f8b1efa7a [INLONG-6274][Sort] Support multiple sink for IcebergLoadNode (#6215)
f8b1efa7a is described below
commit f8b1efa7aeea207bb6a48b57833c990c96cee28b
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Tue Oct 25 16:23:05 2022 +0800
[INLONG-6274][Sort] Support multiple sink for IcebergLoadNode (#6215)
---
.../sort/base/format/JsonDynamicSchemaFormat.java | 2 +-
inlong-sort/sort-connectors/iceberg/pom.xml | 8 +
.../sort/iceberg/FlinkDynamicTableFactory.java | 46 +-
.../inlong/sort/iceberg/FlinkTypeToType.java | 201 ++++++
.../inlong/sort/iceberg/IcebergTableSink.java | 55 +-
.../inlong/sort/iceberg/sink/DeltaManifests.java | 14 +-
.../iceberg/sink/DeltaManifestsSerializer.java | 4 +-
.../sort/iceberg/sink/FlinkManifestUtil.java | 12 +-
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 134 +++-
.../iceberg/sink/ManifestOutputFileFactory.java | 4 +-
.../iceberg/sink/RowDataTaskWriterFactory.java | 3 +-
.../sink/multiple/DynamicSchemaHandleOperator.java | 293 ++++++++
.../multiple/IcebergMultipleFilesCommiter.java | 106 +++
.../sink/multiple/IcebergMultipleStreamWriter.java | 207 ++++++
.../sink/multiple/IcebergProcessFunction.java | 107 +++
.../sink/multiple/IcebergProcessOperator.java | 144 ++++
.../IcebergSingleFileCommiter.java} | 785 +++++++++++----------
.../IcebergSingleStreamWriter.java} | 353 +++++----
.../iceberg/sink/multiple/MultipleWriteResult.java | 41 ++
.../iceberg/sink/multiple/RecordWithSchema.java | 171 +++++
.../iceberg/sink/multiple/SchemaChangeUtils.java | 136 ++++
.../sink/multiple/SchemaEvolutionFunction.java | 23 +
22 files changed, 2229 insertions(+), 620 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index 3ad9117f7..f2527d253 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -96,7 +96,7 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
protected JsonDynamicSchemaFormat() {
this.rowDataConverters =
- new JsonToRowDataConverters(true, false, TimestampFormat.SQL);
+ new JsonToRowDataConverters(true, false, TimestampFormat.ISO_8601);
}
/**
diff --git a/inlong-sort/sort-connectors/iceberg/pom.xml b/inlong-sort/sort-connectors/iceberg/pom.xml
index 115fc0a69..b5ade006f 100644
--- a/inlong-sort/sort-connectors/iceberg/pom.xml
+++ b/inlong-sort/sort-connectors/iceberg/pom.xml
@@ -101,6 +101,14 @@
</includes>
</filter>
</filters>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.inlong.sort.base</pattern>
+ <shadedPattern>
+ org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.base
+ </shadedPattern>
+ </relocation>
+ </relocations>
</configuration>
</execution>
</executions>
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index f12bd1b56..5d63d8dc9 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.Preconditions;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.IcebergTableSource;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -49,6 +50,12 @@ import java.util.Set;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ADD_COLUMN_POLICY;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DEL_COLUMN_POLICY;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
/**
@@ -109,6 +116,17 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
return GET_CATALOG_TABLE.invoke(context);
}
+ private static CatalogLoader createCatalogLoader(Map<String, String> tableProps) {
+ Configuration flinkConf = new Configuration();
+ tableProps.forEach(flinkConf::setString);
+
+ String catalogName = flinkConf.getString(CATALOG_NAME);
+ Preconditions.checkNotNull(catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key());
+
+ org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
+ return FlinkCatalogFactory.createCatalogLoader(catalogName, tableProps, hadoopConf);
+ }
+
private static TableLoader createTableLoader(CatalogBaseTable catalogBaseTable,
Map<String, String> tableProps,
String databaseName,
@@ -185,15 +203,21 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
Map<String, String> tableProps = catalogTable.getOptions();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
- TableLoader tableLoader;
- if (catalog != null) {
- tableLoader = createTableLoader(catalog, objectPath);
+ boolean multipleSink = Boolean.parseBoolean(
+ tableProps.getOrDefault(SINK_MULTIPLE_ENABLE.key(), SINK_MULTIPLE_ENABLE.defaultValue().toString()));
+ if (multipleSink) {
+ CatalogLoader catalogLoader = createCatalogLoader(tableProps);
+ return new IcebergTableSink(null, tableSchema, catalogTable, catalogLoader);
} else {
- tableLoader = createTableLoader(catalogTable, tableProps, objectPath.getDatabaseName(),
- objectPath.getObjectName());
+ TableLoader tableLoader;
+ if (catalog != null) {
+ tableLoader = createTableLoader(catalog, objectPath);
+ } else {
+ tableLoader = createTableLoader(catalogTable, tableProps, objectPath.getDatabaseName(),
+ objectPath.getObjectName());
+ }
+ return new IcebergTableSink(tableLoader, tableSchema, catalogTable, null);
}
-
- return new IcebergTableSink(tableLoader, tableSchema, catalogTable);
}
@Override
@@ -212,6 +236,13 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
options.add(ICEBERG_IGNORE_ALL_CHANGELOG);
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
+
+ options.add(SINK_MULTIPLE_ENABLE);
+ options.add(SINK_MULTIPLE_FORMAT);
+ options.add(SINK_MULTIPLE_DATABASE_PATTERN);
+ options.add(SINK_MULTIPLE_TABLE_PATTERN);
+ options.add(SINK_MULTIPLE_ADD_COLUMN_POLICY);
+ options.add(SINK_MULTIPLE_DEL_COLUMN_POLICY);
return options;
}
@@ -220,3 +251,4 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
return FACTORY_IDENTIFIER;
}
}
+
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkTypeToType.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkTypeToType.java
new file mode 100644
index 000000000..f18d99b3d
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkTypeToType.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.iceberg.flink.FlinkTypeVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkTypeToType extends FlinkTypeVisitor<Type> {
+
+ private final RowType root;
+ private int nextId;
+
+ public FlinkTypeToType(RowType root) {
+ this.root = root;
+ // the root struct's fields use the first ids
+ this.nextId = root.getFieldCount();
+ }
+
+ private int getNextId() {
+ int next = nextId;
+ nextId += 1;
+ return next;
+ }
+
+ @Override
+ public Type visit(CharType charType) {
+ return Types.StringType.get();
+ }
+
+ @Override
+ public Type visit(VarCharType varCharType) {
+ return Types.StringType.get();
+ }
+
+ @Override
+ public Type visit(BooleanType booleanType) {
+ return Types.BooleanType.get();
+ }
+
+ @Override
+ public Type visit(BinaryType binaryType) {
+ return Types.FixedType.ofLength(binaryType.getLength());
+ }
+
+ @Override
+ public Type visit(VarBinaryType varBinaryType) {
+ return Types.BinaryType.get();
+ }
+
+ @Override
+ public Type visit(DecimalType decimalType) {
+ return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
+ }
+
+ @Override
+ public Type visit(TinyIntType tinyIntType) {
+ return Types.IntegerType.get();
+ }
+
+ @Override
+ public Type visit(SmallIntType smallIntType) {
+ return Types.IntegerType.get();
+ }
+
+ @Override
+ public Type visit(IntType intType) {
+ return Types.IntegerType.get();
+ }
+
+ @Override
+ public Type visit(BigIntType bigIntType) {
+ return Types.LongType.get();
+ }
+
+ @Override
+ public Type visit(FloatType floatType) {
+ return Types.FloatType.get();
+ }
+
+ @Override
+ public Type visit(DoubleType doubleType) {
+ return Types.DoubleType.get();
+ }
+
+ @Override
+ public Type visit(DateType dateType) {
+ return Types.DateType.get();
+ }
+
+ @Override
+ public Type visit(TimeType timeType) {
+ return Types.TimeType.get();
+ }
+
+ @Override
+ public Type visit(TimestampType timestampType) {
+ return Types.TimestampType.withoutZone();
+ }
+
+ @Override
+ public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+ return Types.TimestampType.withZone();
+ }
+
+ @Override
+ public Type visit(ArrayType arrayType) {
+ Type elementType = arrayType.getElementType().accept(this);
+ if (arrayType.getElementType().isNullable()) {
+ return Types.ListType.ofOptional(getNextId(), elementType);
+ } else {
+ return Types.ListType.ofRequired(getNextId(), elementType);
+ }
+ }
+
+ @Override
+ public Type visit(MultisetType multisetType) {
+ Type elementType = multisetType.getElementType().accept(this);
+ return Types.MapType.ofRequired(getNextId(), getNextId(), elementType, Types.IntegerType.get());
+ }
+
+ @Override
+ public Type visit(MapType mapType) {
+ // keys in map are not allowed to be null.
+ Type keyType = mapType.getKeyType().accept(this);
+ Type valueType = mapType.getValueType().accept(this);
+ if (mapType.getValueType().isNullable()) {
+ return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType);
+ } else {
+ return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("ReferenceEquality")
+ public Type visit(RowType rowType) {
+ List<NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
+ boolean isRoot = root == rowType;
+
+ List<Type> types = rowType.getFields().stream()
+ .map(f -> f.getType().accept(this))
+ .collect(Collectors.toList());
+
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ int id = isRoot ? i : getNextId();
+
+ RowType.RowField field = rowType.getFields().get(i);
+ String name = field.getName();
+ String comment = field.getDescription().orElse(null);
+
+ if (field.getType().isNullable()) {
+ newFields.add(Types.NestedField.optional(id, name, types.get(i), comment));
+ } else {
+ newFields.add(Types.NestedField.required(id, name, types.get(i), comment));
+ }
+ }
+
+ return Types.StructType.of(newFields);
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index 37eb2670b..8296d02ff 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -19,6 +19,8 @@
package org.apache.inlong.sort.iceberg;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
@@ -29,18 +31,25 @@ import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
import org.apache.inlong.sort.iceberg.sink.FlinkSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ADD_COLUMN_POLICY;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DEL_COLUMN_POLICY;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
/**
@@ -56,6 +65,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
private final TableSchema tableSchema;
private final CatalogTable catalogTable;
+ private final CatalogLoader catalogLoader;
private boolean overwrite = false;
@@ -64,12 +74,15 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
this.tableSchema = toCopy.tableSchema;
this.overwrite = toCopy.overwrite;
this.catalogTable = toCopy.catalogTable;
+ this.catalogLoader = toCopy.catalogLoader;
}
- public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema, CatalogTable catalogTable) {
+ public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema, CatalogTable catalogTable,
+ CatalogLoader catalogLoader) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
this.catalogTable = catalogTable;
+ this.catalogLoader = catalogLoader;
}
@Override
@@ -81,17 +94,33 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
.map(UniqueConstraint::getColumns)
.orElseGet(ImmutableList::of);
- return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
- .tableLoader(tableLoader)
- .tableSchema(tableSchema)
- .equalityFieldColumns(equalityColumns)
- .overwrite(overwrite)
- .appendMode(Boolean.valueOf(
- Optional.ofNullable(catalogTable.getOptions().get(ICEBERG_IGNORE_ALL_CHANGELOG.key()))
- .orElse(ICEBERG_IGNORE_ALL_CHANGELOG.defaultValue().toString())))
- .metric(catalogTable.getOptions().getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()),
- catalogTable.getOptions().getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()))
- .append();
+ final ReadableConfig tableOptions = Configuration.fromMap(catalogTable.getOptions());
+ boolean multipleSink = tableOptions.get(SINK_MULTIPLE_ENABLE);
+ if (multipleSink) {
+ return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
+ .overwrite(overwrite)
+ .appendMode(tableOptions.get(ICEBERG_IGNORE_ALL_CHANGELOG))
+ .metric(tableOptions.get(INLONG_METRIC), tableOptions.get(INLONG_AUDIT))
+ .catalogLoader(catalogLoader)
+ .multipleSink(tableOptions.get(SINK_MULTIPLE_ENABLE))
+ .multipleSinkOption(MultipleSinkOption.builder()
+ .withFormat(tableOptions.get(SINK_MULTIPLE_FORMAT))
+ .withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN))
+ .withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN))
+ .withAddColumnPolicy(tableOptions.get(SINK_MULTIPLE_ADD_COLUMN_POLICY))
+ .withDelColumnPolicy(tableOptions.get(SINK_MULTIPLE_DEL_COLUMN_POLICY))
+ .build())
+ .append();
+ } else {
+ return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
+ .tableLoader(tableLoader)
+ .tableSchema(tableSchema)
+ .equalityFieldColumns(equalityColumns)
+ .overwrite(overwrite)
+ .appendMode(tableOptions.get(ICEBERG_IGNORE_ALL_CHANGELOG))
+ .metric(tableOptions.get(INLONG_METRIC), tableOptions.get(INLONG_AUDIT))
+ .append();
+ }
}
@Override
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
index dae992042..4ec9683df 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
@@ -28,7 +28,7 @@ import java.util.List;
/**
* Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
*/
-class DeltaManifests {
+public class DeltaManifests {
private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0];
@@ -36,11 +36,11 @@ class DeltaManifests {
private final ManifestFile deleteManifest;
private final CharSequence[] referencedDataFiles;
- DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) {
+ public DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) {
this(dataManifest, deleteManifest, EMPTY_REF_DATA_FILES);
}
- DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest, CharSequence[] referencedDataFiles) {
+ public DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest, CharSequence[] referencedDataFiles) {
Preconditions.checkNotNull(referencedDataFiles, "Referenced data files shouldn't be null.");
this.dataManifest = dataManifest;
@@ -48,19 +48,19 @@ class DeltaManifests {
this.referencedDataFiles = referencedDataFiles;
}
- ManifestFile dataManifest() {
+ public ManifestFile dataManifest() {
return dataManifest;
}
- ManifestFile deleteManifest() {
+ public ManifestFile deleteManifest() {
return deleteManifest;
}
- CharSequence[] referencedDataFiles() {
+ public CharSequence[] referencedDataFiles() {
return referencedDataFiles;
}
- List<ManifestFile> manifests() {
+ public List<ManifestFile> manifests() {
List<ManifestFile> manifests = Lists.newArrayListWithCapacity(2);
if (dataManifest != null) {
manifests.add(dataManifest);
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
index 1ae2bcbe5..4a53ab574 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
@@ -33,12 +33,12 @@ import java.io.IOException;
/**
* Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
*/
-class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
+public class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
private static final int VERSION_1 = 1;
private static final int VERSION_2 = 2;
private static final byte[] EMPTY_BINARY = new byte[0];
- static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer();
+ public static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer();
@Override
public int getVersion() {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
index 46d7beec9..3a2cc8fff 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
@@ -41,14 +41,14 @@ import java.util.function.Supplier;
/**
* Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
*/
-class FlinkManifestUtil {
+public class FlinkManifestUtil {
private static final int FORMAT_V2 = 2;
private static final Long DUMMY_SNAPSHOT_ID = 0L;
private FlinkManifestUtil() {
}
- static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles)
+ public static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles)
throws IOException {
ManifestWriter<DataFile> writer = ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);
@@ -59,19 +59,19 @@ class FlinkManifestUtil {
return writer.toManifestFile();
}
- static List<DataFile> readDataFiles(ManifestFile manifestFile, FileIO io) throws IOException {
+ public static List<DataFile> readDataFiles(ManifestFile manifestFile, FileIO io) throws IOException {
try (CloseableIterable<DataFile> dataFiles = ManifestFiles.read(manifestFile, io)) {
return Lists.newArrayList(dataFiles);
}
}
- static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
+ public static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
long attemptNumber) {
TableOperations ops = ((HasTableOperations) table).operations();
return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
}
- static DeltaManifests writeCompletedFiles(WriteResult result,
+ public static DeltaManifests writeCompletedFiles(WriteResult result,
Supplier<OutputFile> outputFileSupplier,
PartitionSpec spec) throws IOException {
@@ -101,7 +101,7 @@ class FlinkManifestUtil {
return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
}
- static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io) throws IOException {
+ public static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io) throws IOException {
WriteResult.Builder builder = WriteResult.builder();
// Read the completed data files from persisted data manifest file.
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 8662722d8..bb7498650 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -40,6 +40,8 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.TaskWriterFactory;
@@ -49,6 +51,15 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergMultipleFilesCommiter;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergMultipleStreamWriter;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergProcessOperator;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergSingleFileCommiter;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergSingleStreamWriter;
+import org.apache.inlong.sort.iceberg.sink.multiple.MultipleWriteResult;
+import org.apache.inlong.sort.iceberg.sink.multiple.RecordWithSchema;
+import org.apache.inlong.sort.iceberg.sink.multiple.DynamicSchemaHandleOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,8 +87,14 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
public class FlinkSink {
private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
- private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
- private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();
+ private static final String ICEBERG_STREAM_WRITER_NAME = IcebergSingleStreamWriter.class.getSimpleName();
+ private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergSingleFileCommiter.class.getSimpleName();
+ private static final String ICEBERG_MULTIPLE_STREAM_WRITER_NAME =
+ IcebergMultipleStreamWriter.class.getSimpleName();
+ private static final String ICEBERG_MULTIPLE_FILES_COMMITTER_NAME =
+ IcebergMultipleFilesCommiter.class.getSimpleName();
+ private static final String ICEBERG_WHOLE_DATABASE_MIGRATION_NAME =
+ DynamicSchemaHandleOperator.class.getSimpleName();
private FlinkSink() {
}
@@ -141,6 +158,9 @@ public class FlinkSink {
private String uidPrefix = null;
private String inlongMetric = null;
private String auditHostAndPorts = null;
+ private CatalogLoader catalogLoader = null;
+ private boolean multipleSink = false;
+ private MultipleSinkOption multipleSinkOption = null;
private Builder() {
}
@@ -166,9 +186,9 @@ public class FlinkSink {
}
/**
- * This iceberg {@link Table} instance is used for initializing {@link IcebergStreamWriter} which will write all
- * the records into {@link DataFile}s and emit them to downstream operator. Providing a table would avoid so
- * many table loading from each separate task.
+ * This iceberg {@link Table} instance is used for initializing {@link IcebergSingleStreamWriter} which will
+ * write all the records into {@link DataFile}s and emit them to downstream operator. Providing a table would \
+ * avoid so many table loading from each separate task.
*
* @param newTable the loaded iceberg table instance.
* @return {@link Builder} to connect the iceberg table.
@@ -179,7 +199,7 @@ public class FlinkSink {
}
/**
- * The table loader is used for loading tables in {@link IcebergFilesCommitter} lazily, we need this loader
+ * The table loader is used for loading tables in {@link IcebergSingleFileCommiter} lazily, we need this loader
* because {@link Table} is not serializable and could not just use the loaded table from Builder#table in the
* remote task manager.
*
@@ -196,6 +216,29 @@ public class FlinkSink {
return this;
}
+ /**
+ * The catalog loader is used for loading tables in {@link IcebergMultipleStreamWriter} and
+ * {@link DynamicSchemaHandleOperator} lazily, we need this loader because in multiple sink scene which table
+ * to load is determined in runtime, so we should hold a {@link org.apache.iceberg.catalog.Catalog} at runtime.
+ *
+ * @param catalogLoader to load iceberg catalog inside tasks.
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public Builder catalogLoader(CatalogLoader catalogLoader) {
+ this.catalogLoader = catalogLoader;
+ return this;
+ }
+
+ public Builder multipleSink(boolean multipleSink) {
+ this.multipleSink = multipleSink;
+ return this;
+ }
+
+ public Builder multipleSinkOption(MultipleSinkOption multipleSinkOption) {
+ this.multipleSinkOption = multipleSinkOption;
+ return this;
+ }
+
public Builder overwrite(boolean newOverwrite) {
this.overwrite = newOverwrite;
return this;
@@ -302,6 +345,11 @@ public class FlinkSink {
}
private <T> DataStreamSink<T> chainIcebergOperators() {
+ return multipleSink ? chainIcebergMultipleSinkOperators() : chainIcebergSingleSinkOperators();
+ }
+
+ private <T> DataStreamSink<T> chainIcebergSingleSinkOperators() {
+
Preconditions.checkArgument(inputCreator != null,
"Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
@@ -335,6 +383,24 @@ public class FlinkSink {
return appendDummySink(committerStream);
}
+ private <T> DataStreamSink<T> chainIcebergMultipleSinkOperators() {
+ Preconditions.checkArgument(inputCreator != null,
+ "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
+ DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);
+
+ // Add parallel writers that append rows to files
+ SingleOutputStreamOperator<MultipleWriteResult> writerStream = appendMultipleWriter(rowDataInput);
+
+ // Add single-parallelism committer that commits files
+ // after successful checkpoint or end of input
+ SingleOutputStreamOperator<Void> committerStream = appendMultipleCommitter(writerStream);
+
+ // Add dummy discard sink
+ return appendDummySink(committerStream);
+ }
+
+
+
/**
* Append the iceberg sink operators to write records to iceberg table.
*
@@ -364,7 +430,8 @@ public class FlinkSink {
private <T> DataStreamSink<T> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
DataStreamSink<T> resultStream = committerStream
.addSink(new DiscardingSink())
- .name(operatorName(String.format("IcebergSink %s", this.table.name())))
+ .name(operatorName(
+ String.format("IcebergSink %s", multipleSink ? "whole migration" : this.table.name())))
.setParallelism(1);
if (uidPrefix != null) {
resultStream = resultStream.uid(uidPrefix + "-dummysink");
@@ -373,7 +440,8 @@ public class FlinkSink {
}
private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
- IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+ IcebergProcessOperator<WriteResult, Void> filesCommitter = new IcebergProcessOperator<>(
+ new IcebergSingleFileCommiter(TableIdentifier.of(table.name()), tableLoader, overwrite));
SingleOutputStreamOperator<Void> committerStream = writerStream
.transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
.setParallelism(1)
@@ -384,6 +452,20 @@ public class FlinkSink {
return committerStream;
}
+ private SingleOutputStreamOperator<Void> appendMultipleCommitter(
+ SingleOutputStreamOperator<MultipleWriteResult> writerStream) {
+ IcebergProcessOperator<MultipleWriteResult, Void> multipleFilesCommiter =
+ new IcebergProcessOperator<>(new IcebergMultipleFilesCommiter(catalogLoader, overwrite));
+ SingleOutputStreamOperator<Void> committerStream = writerStream
+ .transform(operatorName(ICEBERG_MULTIPLE_FILES_COMMITTER_NAME), Types.VOID, multipleFilesCommiter)
+ .setParallelism(1)
+ .setMaxParallelism(1);
+ if (uidPrefix != null) {
+ committerStream = committerStream.uid(uidPrefix + "-committer");
+ }
+ return committerStream;
+ }
+
private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds = Lists.newArrayList();
@@ -416,7 +498,7 @@ public class FlinkSink {
}
}
- IcebergStreamWriter<RowData> streamWriter = createStreamWriter(
+ IcebergProcessOperator<RowData, WriteResult> streamWriter = createStreamWriter(
table, flinkRowType, equalityFieldIds, upsertMode, appendMode, inlongMetric, auditHostAndPorts);
int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
@@ -431,6 +513,33 @@ public class FlinkSink {
return writerStream;
}
+ private SingleOutputStreamOperator<MultipleWriteResult> appendMultipleWriter(DataStream<RowData> input) {
+ // equality field will be initialized at runtime
+ // upsert mode will be initialized at runtime
+
+ int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
+ DynamicSchemaHandleOperator routeOperator = new DynamicSchemaHandleOperator(
+ catalogLoader,
+ multipleSinkOption);
+ SingleOutputStreamOperator<RecordWithSchema> routeStream = input
+ .transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
+ TypeInformation.of(RecordWithSchema.class),
+ routeOperator)
+ .setParallelism(parallelism);
+
+ IcebergProcessOperator streamWriter =
+ new IcebergProcessOperator(new IcebergMultipleStreamWriter(appendMode, catalogLoader));
+ SingleOutputStreamOperator<MultipleWriteResult> writerStream = routeStream
+ .transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME),
+ TypeInformation.of(IcebergProcessOperator.class),
+ streamWriter)
+ .setParallelism(parallelism);
+ if (uidPrefix != null) {
+ writerStream = writerStream.uid(uidPrefix + "-writer");
+ }
+ return writerStream;
+ }
+
private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
Map<String, String> properties,
PartitionSpec partitionSpec,
@@ -487,7 +596,7 @@ public class FlinkSink {
}
}
- static IcebergStreamWriter<RowData> createStreamWriter(Table table,
+ static IcebergProcessOperator<RowData, WriteResult> createStreamWriter(Table table,
RowType flinkRowType,
List<Integer> equalityFieldIds,
boolean upsert,
@@ -501,10 +610,11 @@ public class FlinkSink {
Table serializableTable = SerializableTable.copyOf(table);
TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
- serializableTable, flinkRowType, targetFileSize,
+ serializableTable, serializableTable.schema(), flinkRowType, targetFileSize,
fileFormat, equalityFieldIds, upsert, appendMode);
- return new IcebergStreamWriter<>(table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts);
+ return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>(
+ table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts));
}
private static FileFormat getFileFormat(Map<String, String> properties) {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
index 97f4f3f02..95549d7d4 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
*/
-class ManifestOutputFileFactory {
+public class ManifestOutputFileFactory {
// Users could define their own flink manifests directory by setting this value in table properties.
static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location";
@@ -58,7 +58,7 @@ class ManifestOutputFileFactory {
attemptNumber, checkpointId, fileCount.incrementAndGet()));
}
- OutputFile create(long checkpointId) {
+ public OutputFile create(long checkpointId) {
String flinkManifestDir = props.get(FLINK_MANIFEST_LOCATION);
String newManifestFullPath;
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
index aa724ac7f..f642e86aa 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
@@ -62,6 +62,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
private transient OutputFileFactory outputFileFactory;
public RowDataTaskWriterFactory(Table table,
+ Schema scheam,
RowType flinkSchema,
long targetFileSizeBytes,
FileFormat format,
@@ -69,7 +70,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
boolean upsert,
boolean appendMode) {
this.table = table;
- this.schema = table.schema();
+ this.schema = scheam;
this.flinkSchema = flinkSchema;
this.spec = table.spec();
this.io = table.io();
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
new file mode 100644
index 000000000..ea69c12f9
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+import org.apache.inlong.sort.base.sink.TableChange;
+import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
+import org.apache.inlong.sort.base.sink.TableChange.DeleteColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE;
+
+public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWithSchema>
+ implements OneInputStreamOperator<RowData, RecordWithSchema>, ProcessingTimeCallback {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicSchemaHandleOperator.class);
+ private static final long HELPER_DEBUG_INTERVEL = 10 * 60 * 1000;
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private final CatalogLoader catalogLoader;
+ private final MultipleSinkOption multipleSinkOption;
+
+ private transient Catalog catalog;
+ private transient SupportsNamespaces asNamespaceCatalog;
+ private transient AbstractDynamicSchemaFormat<JsonNode> dynamicSchemaFormat;
+ private transient ProcessingTimeService processingTimeService;
+
+ // record cache, wait schema to consume record
+ private transient Map<TableIdentifier, Queue<RecordWithSchema>> recordQueues;
+
+ // schema cache
+ private transient Map<TableIdentifier, Schema> schemaCache;
+
+ // blacklist to filter schema update failed table
+ private transient Set<TableIdentifier> blacklist;
+
+ public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
+ MultipleSinkOption multipleSinkOption) {
+ this.catalogLoader = catalogLoader;
+ this.multipleSinkOption = multipleSinkOption;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.catalog = catalogLoader.loadCatalog();
+ this.asNamespaceCatalog =
+ catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
+ this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+ this.processingTimeService = getRuntimeContext().getProcessingTimeService();
+ processingTimeService.registerTimer(
+ processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this);
+
+ this.recordQueues = new HashMap<>();
+ this.schemaCache = new HashMap<>();
+ this.blacklist = new HashSet<>();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (catalog instanceof Closeable) {
+ ((Closeable) catalog).close();
+ }
+ }
+
+ @Override
+ public void processElement(StreamRecord<RowData> element) throws Exception {
+ JsonNode jsonNode = dynamicSchemaFormat.deserialize(element.getValue().getBinary(0));
+
+ TableIdentifier tableId = parseId(jsonNode);
+ if (blacklist.contains(tableId)) {
+ return;
+ }
+
+ boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
+ if (isDDL) {
+ execDDL(jsonNode, tableId);
+ } else {
+ execDML(jsonNode, tableId);
+ }
+ }
+
+ @Override
+ public void onProcessingTime(long timestamp) {
+ LOG.info("Black list table: {} at time {}.", blacklist, timestamp);
+ processingTimeService.registerTimer(
+ processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this);
+ }
+
+ private void execDDL(JsonNode jsonNode, TableIdentifier tableId) {
+ // todo:parse ddl sql
+ }
+
+ private void execDML(JsonNode jsonNode, TableIdentifier tableId) {
+ RecordWithSchema record = parseRecord(jsonNode, tableId);
+ Schema schema = schemaCache.get(record.getTableId());
+ Schema dataSchema = record.getSchema();
+ recordQueues.compute(record.getTableId(), (k, v) -> {
+ if (v == null) {
+ v = new LinkedList<>();
+ }
+ v.add(record);
+ return v;
+ });
+
+ if (schema == null) {
+ handleTableCreateEventFromOperator(record.getTableId(), dataSchema);
+ } else {
+ handleSchemaInfoEvent(record.getTableId(), schema);
+ }
+ }
+
+ // ======================== All coordinator interact request and response method ============================
+ private void handleSchemaInfoEvent(TableIdentifier tableId, Schema schema) {
+ schemaCache.put(tableId, schema);
+ Schema latestSchema = schemaCache.get(tableId);
+ Queue<RecordWithSchema> queue = recordQueues.get(tableId);
+ while (queue != null && !queue.isEmpty()) {
+ Schema dataSchema = queue.peek().getSchema();
+ // if compatible, this means that the current schema is the latest schema
+ // if not, prove the need to update the current schema
+ if (isCompatible(latestSchema, dataSchema)) {
+ RecordWithSchema recordWithSchema = queue.poll()
+ .refreshFieldId(latestSchema)
+ .refreshRowData((jsonNode, schema1) -> {
+ try {
+ return dynamicSchemaFormat.extractRowData(jsonNode, FlinkSchemaUtil.convert(schema1));
+ } catch (Exception e) {
+ LOG.warn("Ignore table {} schema change, old: {} new: {}.",
+ tableId, dataSchema, latestSchema, e);
+ blacklist.add(tableId);
+ }
+ return Collections.emptyList();
+ });
+ output.collect(new StreamRecord<>(recordWithSchema));
+ } else {
+ handldAlterSchemaEventFromOperator(tableId, latestSchema, dataSchema);
+ }
+ }
+ }
+
+ // ================================ All coordinator handle method ==============================================
+ private void handleTableCreateEventFromOperator(TableIdentifier tableId, Schema schema) {
+ if (!catalog.tableExists(tableId)) {
+ if (asNamespaceCatalog != null && !asNamespaceCatalog.namespaceExists(tableId.namespace())) {
+ try {
+ asNamespaceCatalog.createNamespace(tableId.namespace());
+ LOG.info("Auto create Database({}) in Catalog({}).", tableId.namespace(), catalog.name());
+ } catch (AlreadyExistsException e) {
+ LOG.warn("Database({}) already exist in Catalog({})!", tableId.namespace(), catalog.name());
+ }
+ }
+
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+ properties.put("format-version", "2");
+ properties.put("write.upsert.enabled", "true");
+ // for hive visible
+ properties.put("engine.hive.enabled", "true");
+
+ try {
+ catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), properties.build());
+ LOG.info("Auto create Table({}) in Database({}) in Catalog({})!",
+ tableId.name(), tableId.namespace(), catalog.name());
+ } catch (AlreadyExistsException e) {
+ LOG.warn("Table({}) already exist in Database({}) in Catalog({})!",
+ tableId.name(), tableId.namespace(), catalog.name());
+ }
+ }
+
+ handleSchemaInfoEvent(tableId, catalog.loadTable(tableId).schema());
+ }
+
+ private void handldAlterSchemaEventFromOperator(TableIdentifier tableId, Schema oldSchema, Schema newSchema) {
+ Table table = catalog.loadTable(tableId);
+
+ // The transactionality of changes is guaranteed by comparing the old schema with the current schema of the
+ // table.
+ // Judging whether changes can be made by schema comparison (currently only column additions are supported),
+ // for scenarios that cannot be changed, it is always considered that there is a problem with the data.
+ Transaction transaction = table.newTransaction();
+ if (table.schema().sameSchema(oldSchema)) {
+ List<TableChange> tableChanges = SchemaChangeUtils.diffSchema(oldSchema, newSchema);
+ if (canHandleWithSchemaUpdate(tableId, tableChanges)) {
+ SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges);
+ LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges);
+ }
+ }
+ transaction.commitTransaction();
+ handleSchemaInfoEvent(tableId, table.schema());
+ }
+
+ // =============================== Utils method =================================================================
+ // The way to judge compatibility is whether all the field names in the old schema exist in the new schema
+ private boolean isCompatible(Schema newSchema, Schema oldSchema) {
+ for (NestedField field : oldSchema.columns()) {
+ if (newSchema.findField(field.name()) == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private TableIdentifier parseId(JsonNode data) throws IOException {
+ String databaseStr = dynamicSchemaFormat.parse(data, multipleSinkOption.getDatabasePattern());
+ String tableStr = dynamicSchemaFormat.parse(data, multipleSinkOption.getTablePattern());
+ return TableIdentifier.of(databaseStr, tableStr);
+ }
+
+ private RecordWithSchema parseRecord(JsonNode data, TableIdentifier tableId) {
+ List<String> pkListStr = dynamicSchemaFormat.extractPrimaryKeyNames(data);
+ RowType schema = dynamicSchemaFormat.extractSchema(data, pkListStr);
+
+ RecordWithSchema record = new RecordWithSchema(
+ data,
+ FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(schema)),
+ tableId,
+ pkListStr);
+ return record;
+ }
+
+ private boolean canHandleWithSchemaUpdate(TableIdentifier tableId, List<TableChange> tableChanges) {
+ boolean canHandle = true;
+ for (TableChange tableChange : tableChanges) {
+ if (tableChange instanceof AddColumn) {
+ canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+ multipleSinkOption.getAddColumnPolicy());
+ } else if (tableChange instanceof DeleteColumn) {
+ canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+ multipleSinkOption.getDelColumnPolicy());
+ } else {
+ canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+ LOG_WITH_IGNORE);
+ }
+ }
+
+ if (!canHandle) {
+ blacklist.add(tableId);
+ }
+ return canHandle;
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java
new file mode 100644
index 000000000..131daba7f
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.TableLoader;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class IcebergMultipleFilesCommiter extends IcebergProcessFunction<MultipleWriteResult, Void>
+ implements CheckpointedFunction, CheckpointListener, BoundedOneInput {
+
+ private Map<TableIdentifier, IcebergSingleFileCommiter> multipleCommiters;
+ private final CatalogLoader catalogLoader;
+ private final boolean overwrite;
+
+ public IcebergMultipleFilesCommiter(CatalogLoader catalogLoader, boolean overwrite) {
+ this.catalogLoader = catalogLoader;
+ this.overwrite = overwrite;
+ }
+
+ private transient FunctionInitializationContext functionInitializationContext;
+
+ @Override
+ public void processElement(MultipleWriteResult value) throws Exception {
+ TableIdentifier tableId = value.getTableId();
+ if (multipleCommiters.get(tableId) == null) {
+ IcebergSingleFileCommiter commiter = new IcebergSingleFileCommiter(
+ tableId, TableLoader.fromCatalog(catalogLoader, value.getTableId()), overwrite);
+ commiter.setup(getRuntimeContext(), collector, context);
+ commiter.initializeState(functionInitializationContext);
+ commiter.open(new Configuration());
+ multipleCommiters.put(tableId, commiter);
+ }
+
+ multipleCommiters.get(tableId).processElement(value.getWriteResult());
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ for (Entry<TableIdentifier, IcebergSingleFileCommiter> entry : multipleCommiters.entrySet()) {
+ entry.getValue().snapshotState(context);
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ this.functionInitializationContext = context;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ for (Entry<TableIdentifier, IcebergSingleFileCommiter> entry : multipleCommiters.entrySet()) {
+ entry.getValue().notifyCheckpointComplete(checkpointId);
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ multipleCommiters = new HashMap<>();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (multipleCommiters == null) {
+ return;
+ }
+
+ for (Entry<TableIdentifier, IcebergSingleFileCommiter> entry : multipleCommiters.entrySet()) {
+ entry.getValue().close();
+ }
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ for (Entry<TableIdentifier, IcebergSingleFileCommiter> entry : multipleCommiters.entrySet()) {
+ entry.getValue().endInput();
+ }
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
new file mode 100644
index 000000000..4c3fb0045
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+/**
+ * Iceberg writer that can distinguish different sink tables and route and distribute data into different
+ * IcebergStreamWriter.
+ */
+public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWithSchema, MultipleWriteResult>
+ implements CheckpointedFunction, BoundedOneInput {
+ private static final Logger LOG = LoggerFactory.getLogger(IcebergMultipleStreamWriter.class);
+
+ private final boolean appendMode;
+ private final CatalogLoader catalogLoader;
+
+ private transient Catalog catalog;
+ private transient Map<TableIdentifier, IcebergSingleStreamWriter<RowData>> multipleWriters;
+ private transient Map<TableIdentifier, Table> multipleTables;
+ private transient Map<TableIdentifier, Schema> multipleSchemas;
+ private transient FunctionInitializationContext functionInitializationContext;
+
+ public IcebergMultipleStreamWriter(boolean appendMode, CatalogLoader catalogLoader) {
+ this.appendMode = appendMode;
+ this.catalogLoader = catalogLoader;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.catalog = catalogLoader.loadCatalog();
+ this.multipleWriters = new HashMap<>();
+ this.multipleTables = new HashMap<>();
+ this.multipleSchemas = new HashMap<>();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (catalog instanceof Closeable) {
+ ((Closeable) catalog).close();
+ }
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+ entry.getValue().endInput();
+ }
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ super.dispose();
+ for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+ entry.getValue().dispose();
+ }
+ multipleWriters.clear();
+ multipleTables.clear();
+ multipleSchemas.clear();
+ }
+
+ @Override
+ public void processElement(RecordWithSchema recordWithSchema) throws Exception {
+ TableIdentifier tableId = recordWithSchema.getTableId();
+
+ if (isSchemaUpdate(recordWithSchema)) {
+ if (multipleTables.get(tableId) == null) {
+ Table table = catalog.loadTable(recordWithSchema.getTableId());
+ multipleTables.put(tableId, table);
+ }
+
+ // refresh some runtime table properties
+ Table table = multipleTables.get(recordWithSchema.getTableId());
+ Map<String, String> tableProperties = table.properties();
+ boolean upsertMode = PropertyUtil.propertyAsBoolean(tableProperties,
+ UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
+ long targetFileSizeBytes = PropertyUtil.propertyAsLong(tableProperties,
+ WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ String formatString = tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+ FileFormat fileFormat = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+ List<Integer> equalityFieldIds = recordWithSchema.getPrimaryKeys().stream()
+ .map(pk -> recordWithSchema.getSchema().findField(pk).fieldId())
+ .collect(Collectors.toList());
+ // if physical primary key not exist, put all field to logical primary key
+ if (equalityFieldIds.isEmpty()) {
+ equalityFieldIds = recordWithSchema.getSchema().columns().stream()
+ .map(NestedField::fieldId)
+ .collect(Collectors.toList());
+ }
+
+ TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
+ table,
+ recordWithSchema.getSchema(),
+ FlinkSchemaUtil.convert(recordWithSchema.getSchema()),
+ targetFileSizeBytes,
+ fileFormat,
+ equalityFieldIds,
+ upsertMode,
+ appendMode);
+
+ if (multipleWriters.get(tableId) == null) {
+ IcebergSingleStreamWriter<RowData> writer = new IcebergSingleStreamWriter<>(
+ tableId.toString(), taskWriterFactory, null, null);
+ writer.setup(getRuntimeContext(),
+ new CallbackCollector<>(writeResult ->
+ collector.collect(new MultipleWriteResult(tableId, writeResult))),
+ context);
+ writer.initializeState(functionInitializationContext);
+ writer.open(new Configuration());
+ multipleWriters.put(tableId, writer);
+ } else { // only if second times schema will evolute
+ // Refresh new schema maybe cause previous file writer interrupted, so here should handle it
+ multipleWriters.get(tableId).schemaEvolution(taskWriterFactory);
+ }
+
+ }
+
+ if (multipleWriters.get(tableId) != null) {
+ for (RowData data : recordWithSchema.getData()) {
+ multipleWriters.get(tableId).processElement(data);
+ }
+ } else {
+ LOG.error("Unregistered table schema for {}.", recordWithSchema.getTableId());
+ }
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+ entry.getValue().prepareSnapshotPreBarrier(checkpointId);
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+ entry.getValue().snapshotState(context);
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ this.functionInitializationContext = context;
+ }
+
+ private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) {
+ TableIdentifier tableId = recordWithSchema.getTableId();
+ recordWithSchema.replaceSchema();
+ if (multipleSchemas.get(tableId) != null
+ && multipleSchemas.get(tableId).sameSchema(recordWithSchema.getSchema())) {
+ return false;
+ }
+ LOG.info("Schema evolution with table {}, old schema: {}, new Schema: {}",
+ tableId, multipleSchemas.get(tableId), recordWithSchema.getSchema());
+ multipleSchemas.put(recordWithSchema.getTableId(), recordWithSchema.getSchema());
+ return true;
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergProcessFunction.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergProcessFunction.java
new file mode 100644
index 000000000..6c7034488
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergProcessFunction.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+public abstract class IcebergProcessFunction<IN, OUT> extends AbstractRichFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ protected transient Collector<OUT> collector;
+
+ protected transient Context context;
+
+ public abstract void processElement(IN value) throws Exception;
+
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+
+ }
+
+ public void endInput() throws Exception {
+
+ }
+
+ public void dispose() throws Exception {
+
+ }
+
+ public void setup(RuntimeContext t, Collector<OUT> collector, Context context) {
+ setRuntimeContext(t);
+ setCollector(collector);
+ setContext(context);
+ }
+
+ public void setCollector(Collector<OUT> collector) {
+ this.collector = collector;
+ }
+
+ public void setContext(Context context) {
+ this.context = context;
+ }
+
+ public abstract static class Context {
+
+ /**
+ * Timestamp of the element currently being processed or timestamp of a firing timer.
+ *
+ * <p>This might be {@code null}, for example if the time characteristic of your program is
+ * set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+ */
+ public abstract Long timestamp();
+
+ /** A {@link TimerService} for querying time and registering timers. */
+ public abstract TimerService timerService();
+
+ /**
+ * Emits a record to the side output identified by the {@link OutputTag}.
+ *
+ * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
+ * @param value The record to emit.
+ */
+ public abstract <X> void output(OutputTag<X> outputTag, X value);
+ }
+
+ public static class CallbackCollector<T> implements Collector<T> {
+ final ThrowingConsumer<T, Exception> callback;
+
+ CallbackCollector(ThrowingConsumer<T, Exception> callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void collect(T t) {
+ try {
+ callback.accept(t);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergProcessOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergProcessOperator.java
new file mode 100644
index 000000000..64c36e1b5
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergProcessOperator.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.OutputTag;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergProcessFunction.Context;
+
+public class IcebergProcessOperator<IN, OUT>
+ extends AbstractUdfStreamOperator<OUT, IcebergProcessFunction<IN, OUT>>
+ implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
+ private static final long serialVersionUID = -1837485654246776219L;
+
+ private long currentWatermark = Long.MIN_VALUE;
+
+ private transient TimestampedCollector<OUT> collector;
+
+ private transient ContextImpl context;
+
+ public IcebergProcessOperator(IcebergProcessFunction<IN, OUT> userFunction) {
+ super(userFunction);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ collector = new TimestampedCollector<>(output);
+ context = new ContextImpl(getProcessingTimeService());
+ userFunction.setCollector(collector);
+ userFunction.setContext(new ContextImpl(getProcessingTimeService()));
+ }
+
+ @Override
+ public void processElement(StreamRecord<IN> streamRecord) throws Exception {
+ collector.setTimestamp(streamRecord);
+ context.element = streamRecord;
+ userFunction.processElement(streamRecord.getValue());
+ context.element = null;
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+ this.currentWatermark = mark.getTimestamp();
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ super.prepareSnapshotPreBarrier(checkpointId);
+ userFunction.prepareSnapshotPreBarrier(checkpointId);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ userFunction.endInput();
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ super.dispose();
+ userFunction.dispose();
+ }
+
+ private class ContextImpl extends Context implements TimerService {
+ private StreamRecord<IN> element;
+
+ private final ProcessingTimeService processingTimeService;
+
+ ContextImpl(ProcessingTimeService processingTimeService) {
+ this.processingTimeService = processingTimeService;
+ }
+
+ @Override
+ public Long timestamp() {
+ return element == null ? null : element.getTimestamp();
+ }
+
+ @Override
+ public TimerService timerService() {
+ return this;
+ }
+
+ @Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ if (outputTag == null) {
+ throw new IllegalArgumentException("OutputTag must not be null.");
+ }
+ output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
+ }
+
+ @Override
+ public long currentProcessingTime() {
+ return processingTimeService.getCurrentProcessingTime();
+ }
+
+ @Override
+ public long currentWatermark() {
+ return currentWatermark;
+ }
+
+ @Override
+ public void registerProcessingTimeTimer(long time) {
+ throw new UnsupportedOperationException(UNSUPPORTED_REGISTER_TIMER_MSG);
+ }
+
+ @Override
+ public void registerEventTimeTimer(long time) {
+ throw new UnsupportedOperationException(UNSUPPORTED_REGISTER_TIMER_MSG);
+ }
+
+ @Override
+ public void deleteProcessingTimeTimer(long time) {
+ throw new UnsupportedOperationException(UNSUPPORTED_DELETE_TIMER_MSG);
+ }
+
+ @Override
+ public void deleteEventTimeTimer(long time) {
+ throw new UnsupportedOperationException(UNSUPPORTED_DELETE_TIMER_MSG);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
similarity index 83%
rename from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
rename to inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
index 7942ef77c..bd28a22a0 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
@@ -1,392 +1,393 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.sort.iceberg.sink;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.core.io.SimpleVersionedSerialization;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
-import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.ReplacePartitions;
-import org.apache.iceberg.RowDelta;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotUpdate;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.io.WriteResult;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Strings;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.PropertyUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.SortedMap;
-
-/**
- * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
- */
-class IcebergFilesCommitter extends AbstractStreamOperator<Void>
- implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {
-
- private static final long serialVersionUID = 1L;
- private static final long INITIAL_CHECKPOINT_ID = -1L;
- private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
-
- private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
- private static final String FLINK_JOB_ID = "flink.job-id";
-
- // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we
- // could correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg
- // table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
- // committing the iceberg transaction.
- private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
- static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
-
- // TableLoader to load iceberg table lazily.
- private final TableLoader tableLoader;
- private final boolean replacePartitions;
-
- // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed
- // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for
- // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files
- // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
- // any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
- // iceberg table when the next checkpoint happen.
- private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
-
- // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
- // 'dataFilesPerCheckpoint'.
- private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
-
- // It will have an unique identifier for one job.
- private transient String flinkJobId;
- private transient Table table;
- private transient ManifestOutputFileFactory manifestOutputFileFactory;
- private transient long maxCommittedCheckpointId;
- private transient int continuousEmptyCheckpoints;
- private transient int maxContinuousEmptyCommits;
- // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by
- // the same flink job; another case is restoring from snapshot created by another different job. For the second
- // case, we need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id
- // when traversing iceberg table's snapshots.
- private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor<>(
- "iceberg-flink-job-id", BasicTypeInfo.STRING_TYPE_INFO);
- private transient ListState<String> jobIdState;
- // All pending checkpoints states for this function.
- private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor();
- private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
-
- IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) {
- this.tableLoader = tableLoader;
- this.replacePartitions = replacePartitions;
- }
-
- @Override
- public void initializeState(StateInitializationContext context) throws Exception {
- super.initializeState(context);
- this.flinkJobId = getContainingTask().getEnvironment().getJobID().toString();
-
- // Open the table loader and load the table.
- this.tableLoader.open();
- this.table = tableLoader.loadTable();
-
- maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
- Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
- MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
-
- int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
- int attemptId = getRuntimeContext().getAttemptNumber();
- this.manifestOutputFileFactory = FlinkManifestUtil
- .createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
- this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
-
- this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
- this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
- if (context.isRestored()) {
- String restoredFlinkJobId = jobIdState.get().iterator().next();
- Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
- "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
-
- // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job even
- // if it's restored from a snapshot created by another different flink job, so it's safe to assign the max
- // committed checkpoint id from restored flink job to the current flink job.
- this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
-
- NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
- .newTreeMap(checkpointsState.get().iterator().next())
- .tailMap(maxCommittedCheckpointId, false);
- if (!uncommittedDataFiles.isEmpty()) {
- // Committed all uncommitted data files from the old flink job to iceberg table.
- long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
- commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
- }
- }
- }
-
- @Override
- public void snapshotState(StateSnapshotContext context) throws Exception {
- super.snapshotState(context);
- long checkpointId = context.getCheckpointId();
- LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
-
- // Update the checkpoint state.
- dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
- // Reset the snapshot state to the latest state.
- checkpointsState.clear();
- checkpointsState.add(dataFilesPerCheckpoint);
-
- jobIdState.clear();
- jobIdState.add(flinkJobId);
-
- // Clear the local buffer for current checkpoint.
- writeResultsOfCurrentCkpt.clear();
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- super.notifyCheckpointComplete(checkpointId);
- // It's possible that we have the following events:
- // 1. snapshotState(ckpId);
- // 2. snapshotState(ckpId+1);
- // 3. notifyCheckpointComplete(ckpId+1);
- // 4. notifyCheckpointComplete(ckpId);
- // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
- // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
- if (checkpointId > maxCommittedCheckpointId) {
- commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
- this.maxCommittedCheckpointId = checkpointId;
- }
- }
-
- private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
- String newFlinkJobId,
- long checkpointId) throws IOException {
- NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
- List<ManifestFile> manifests = Lists.newArrayList();
- NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
- for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
- if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
- // Skip the empty flink manifest.
- continue;
- }
-
- DeltaManifests deltaManifests = SimpleVersionedSerialization
- .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());
- pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
- manifests.addAll(deltaManifests.manifests());
- }
-
- int totalFiles = pendingResults.values().stream()
- .mapToInt(r -> r.dataFiles().length + r.deleteFiles().length).sum();
- continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
- if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
- if (replacePartitions) {
- replacePartitions(pendingResults, newFlinkJobId, checkpointId);
- } else {
- commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
- }
- continuousEmptyCheckpoints = 0;
- }
- pendingMap.clear();
-
- // Delete the committed manifests.
- for (ManifestFile manifest : manifests) {
- try {
- table.io().deleteFile(manifest.path());
- } catch (Exception e) {
- // The flink manifests cleaning failure shouldn't abort the completed checkpoint.
- String details = MoreObjects.toStringHelper(this)
- .add("flinkJobId", newFlinkJobId)
- .add("checkpointId", checkpointId)
- .add("manifestPath", manifest.path())
- .toString();
- LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink "
- + "manifests: {}", details, e);
- }
- }
- }
-
- private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId,
- long checkpointId) {
- // Partition overwrite does not support delete files.
- int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
- Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files.");
-
- // Commit the overwrite transaction.
- ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
-
- int numFiles = 0;
- for (WriteResult result : pendingResults.values()) {
- Preconditions.checkState(result.referencedDataFiles().length == 0,
- "Should have no referenced data files.");
-
- numFiles += result.dataFiles().length;
- Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
- }
-
- commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
- }
-
- private void commitDeltaTxn(
- NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
- int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
-
- if (deleteFilesNum == 0) {
- // To be compatible with iceberg format V1.
- AppendFiles appendFiles = table.newAppend();
-
- int numFiles = 0;
- for (WriteResult result : pendingResults.values()) {
- Preconditions.checkState(result.referencedDataFiles().length == 0,
- "Should have no referenced data files.");
-
- numFiles += result.dataFiles().length;
- Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
- }
-
- commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
- } else {
- // To be compatible with iceberg format V2.
- for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
- // We don't commit the merged result into a single transaction because for the sequential transaction
- // txn1 and txn2, the equality-delete files of txn2 are required to be applied to data files from txn1.
- // Committing the merged one will lead to the incorrect delete semantic.
- WriteResult result = e.getValue();
-
- // Row delta validations are not needed for streaming changes that write equality deletes. Equality
- // deletes are applied to data in all previous sequence numbers, so retries may push deletes further in
- // the future, but do not affect correctness. Position deletes committed to the table in this path are
- // used only to delete rows from data files that are being added in this commit. There is no way for
- // data files added along with the delete files to be concurrently removed, so there is no need to
- // validate the files referenced by the position delete files that are being committed.
- RowDelta rowDelta = table.newRowDelta();
-
- int numDataFiles = result.dataFiles().length;
- Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
-
- int numDeleteFiles = result.deleteFiles().length;
- Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
-
- commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, e.getKey());
- }
- }
- }
-
- private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int numDeleteFiles, String description,
- String newFlinkJobId, long checkpointId) {
- LOG.info("Committing {} with {} data files and {} delete files to table {}", description, numDataFiles,
- numDeleteFiles, table);
- operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
- operation.set(FLINK_JOB_ID, newFlinkJobId);
-
- long start = System.currentTimeMillis();
- operation.commit(); // abort is automatically called if this fails.
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
- }
-
- @Override
- public void processElement(StreamRecord<WriteResult> element) {
- this.writeResultsOfCurrentCkpt.add(element.getValue());
- }
-
- @Override
- public void endInput() throws IOException {
- // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
- long currentCheckpointId = Long.MAX_VALUE;
- dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
- writeResultsOfCurrentCkpt.clear();
-
- commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId);
- }
-
- /**
- * Write all the complete data files to a newly created manifest file and return the manifest's avro serialized
- * bytes.
- */
- private byte[] writeToManifest(long checkpointId) throws IOException {
- if (writeResultsOfCurrentCkpt.isEmpty()) {
- return EMPTY_MANIFEST_DATA;
- }
-
- WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
- DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result,
- () -> manifestOutputFileFactory.create(checkpointId), table.spec());
-
- return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests);
- }
-
- @Override
- public void dispose() throws Exception {
- if (tableLoader != null) {
- tableLoader.close();
- }
- }
-
- private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor() {
- Comparator<Long> longComparator = Comparators.forType(Types.LongType.get());
- // Construct a SortedMapTypeInfo.
- SortedMapTypeInfo<Long, byte[]> sortedMapTypeInfo = new SortedMapTypeInfo<>(
- BasicTypeInfo.LONG_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, longComparator
- );
- return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
- }
-
- static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
- Snapshot snapshot = table.currentSnapshot();
- long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
-
- while (snapshot != null) {
- Map<String, String> summary = snapshot.summary();
- String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
- if (flinkJobId.equals(snapshotFlinkJobId)) {
- String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
- if (value != null) {
- lastCommittedCheckpointId = Long.parseLong(value);
- break;
- }
- }
- Long parentSnapshotId = snapshot.parentId();
- snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null;
- }
-
- return lastCommittedCheckpointId;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifests;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifestsSerializer;
+import org.apache.inlong.sort.iceberg.sink.FlinkManifestUtil;
+import org.apache.inlong.sort.iceberg.sink.ManifestOutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+
+public class IcebergSingleFileCommiter extends IcebergProcessFunction<WriteResult, Void>
+ implements CheckpointedFunction, CheckpointListener {
+ private static final long serialVersionUID = 1L;
+ private static final long INITIAL_CHECKPOINT_ID = -1L;
+ private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+
+ private static final Logger LOG = LoggerFactory.getLogger(IcebergSingleFileCommiter.class);
+ private static final String FLINK_JOB_ID = "flink.job-id";
+
+ // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we
+ // could correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg
+ // table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
+ // committing the iceberg transaction.
+ private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
+ static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
+
+ // TableLoader to load iceberg table lazily.
+ private final TableLoader tableLoader;
+ private final boolean replacePartitions;
+
+ // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed
+ // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for
+ // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files
+ // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
+ // any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
+ // iceberg table when the next checkpoint happen.
+ private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
+
+ // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
+ // 'dataFilesPerCheckpoint'.
+ private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
+
+ // It will have an unique identifier for one job.
+ private transient String flinkJobId;
+ private transient Table table;
+ private transient ManifestOutputFileFactory manifestOutputFileFactory;
+ private transient long maxCommittedCheckpointId;
+ private transient int continuousEmptyCheckpoints;
+ private transient int maxContinuousEmptyCommits;
+ // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by
+ // the same flink job; another case is restoring from snapshot created by another different job. For the second
+ // case, we need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id
+ // when traversing iceberg table's snapshots.
+ private final ListStateDescriptor<String> jobIdDescriptor;
+ private transient ListState<String> jobIdState;
+ // All pending checkpoints states for this function.
+ private final ListStateDescriptor<SortedMap<Long, byte[]>> stateDescriptor;
+ private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
+
+ public IcebergSingleFileCommiter(TableIdentifier tableId, TableLoader tableLoader, boolean replacePartitions) {
+ // Here must distinguish state descriptor with tableId, because all icebergSingleFileCommiter state in
+ // one IcebergMultipleFilesCommiter use same StateStore.
+ this.tableLoader = tableLoader;
+ this.replacePartitions = replacePartitions;
+ this.jobIdDescriptor = new ListStateDescriptor<>(
+ String.format("iceberg(%s)-flink-job-id", tableId.toString()), BasicTypeInfo.STRING_TYPE_INFO);
+ this.stateDescriptor = buildStateDescriptor(tableId);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ this.flinkJobId = getRuntimeContext().getJobId().toString();
+
+ // Open the table loader and load the table.
+ this.tableLoader.open();
+ this.table = tableLoader.loadTable();
+
+ maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
+ Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
+ MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
+
+ int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+ int attemptId = getRuntimeContext().getAttemptNumber();
+ this.manifestOutputFileFactory = FlinkManifestUtil
+ .createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
+ this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+ this.checkpointsState = context.getOperatorStateStore().getListState(stateDescriptor);
+ this.jobIdState = context.getOperatorStateStore().getListState(jobIdDescriptor);
+ if (context.isRestored()) {
+ String restoredFlinkJobId = jobIdState.get().iterator().next();
+ Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
+ "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
+
+ // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job even
+ // if it's restored from a snapshot created by another different flink job, so it's safe to assign the max
+ // committed checkpoint id from restored flink job to the current flink job.
+ this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
+
+ NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
+ .newTreeMap(checkpointsState.get().iterator().next())
+ .tailMap(maxCommittedCheckpointId, false);
+ if (!uncommittedDataFiles.isEmpty()) {
+ // Committed all uncommitted data files from the old flink job to iceberg table.
+ long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
+ commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ long checkpointId = context.getCheckpointId();
+ LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
+
+ // Update the checkpoint state.
+ dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
+ // Reset the snapshot state to the latest state.
+ checkpointsState.clear();
+ checkpointsState.add(dataFilesPerCheckpoint);
+
+ jobIdState.clear();
+ jobIdState.add(flinkJobId);
+
+ // Clear the local buffer for current checkpoint.
+ writeResultsOfCurrentCkpt.clear();
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // It's possible that we have the following events:
+ // 1. snapshotState(ckpId);
+ // 2. snapshotState(ckpId+1);
+ // 3. notifyCheckpointComplete(ckpId+1);
+ // 4. notifyCheckpointComplete(ckpId);
+ // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
+ // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
+ if (checkpointId > maxCommittedCheckpointId) {
+ commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
+ this.maxCommittedCheckpointId = checkpointId;
+ }
+ }
+
+ private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
+ String newFlinkJobId,
+ long checkpointId) throws IOException {
+ NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
+ List<ManifestFile> manifests = Lists.newArrayList();
+ NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
+ for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
+ if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
+ // Skip the empty flink manifest.
+ continue;
+ }
+
+ DeltaManifests deltaManifests = SimpleVersionedSerialization
+ .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());
+ pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+ manifests.addAll(deltaManifests.manifests());
+ }
+
+ int totalFiles = pendingResults.values().stream()
+ .mapToInt(r -> r.dataFiles().length + r.deleteFiles().length).sum();
+ continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
+ if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
+ if (replacePartitions) {
+ replacePartitions(pendingResults, newFlinkJobId, checkpointId);
+ } else {
+ commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
+ }
+ continuousEmptyCheckpoints = 0;
+ }
+ pendingMap.clear();
+
+ // Delete the committed manifests.
+ for (ManifestFile manifest : manifests) {
+ try {
+ table.io().deleteFile(manifest.path());
+ } catch (Exception e) {
+ // The flink manifests cleaning failure shouldn't abort the completed checkpoint.
+ String details = MoreObjects.toStringHelper(this)
+ .add("flinkJobId", newFlinkJobId)
+ .add("checkpointId", checkpointId)
+ .add("manifestPath", manifest.path())
+ .toString();
+ LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink "
+ + "manifests: {}", details, e);
+ }
+ }
+ }
+
+ private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId,
+ long checkpointId) {
+ // Partition overwrite does not support delete files.
+ int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
+ Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files.");
+
+ // Commit the overwrite transaction.
+ ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
+
+ int numFiles = 0;
+ for (WriteResult result : pendingResults.values()) {
+ Preconditions.checkState(result.referencedDataFiles().length == 0,
+ "Should have no referenced data files.");
+
+ numFiles += result.dataFiles().length;
+ Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
+ }
+
+ commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
+ }
+
+ private void commitDeltaTxn(
+ NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
+ int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
+
+ if (deleteFilesNum == 0) {
+ // To be compatible with iceberg format V1.
+ AppendFiles appendFiles = table.newAppend();
+
+ int numFiles = 0;
+ for (WriteResult result : pendingResults.values()) {
+ Preconditions.checkState(result.referencedDataFiles().length == 0,
+ "Should have no referenced data files.");
+
+ numFiles += result.dataFiles().length;
+ Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
+ }
+
+ commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
+ } else {
+ // To be compatible with iceberg format V2.
+ for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
+ // We don't commit the merged result into a single transaction because for the sequential transaction
+ // txn1 and txn2, the equality-delete files of txn2 are required to be applied to data files from txn1.
+ // Committing the merged one will lead to the incorrect delete semantic.
+ WriteResult result = e.getValue();
+
+ // Row delta validations are not needed for streaming changes that write equality deletes. Equality
+ // deletes are applied to data in all previous sequence numbers, so retries may push deletes further in
+ // the future, but do not affect correctness. Position deletes committed to the table in this path are
+ // used only to delete rows from data files that are being added in this commit. There is no way for
+ // data files added along with the delete files to be concurrently removed, so there is no need to
+ // validate the files referenced by the position delete files that are being committed.
+ RowDelta rowDelta = table.newRowDelta();
+
+ int numDataFiles = result.dataFiles().length;
+ Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
+
+ int numDeleteFiles = result.deleteFiles().length;
+ Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
+
+ commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, e.getKey());
+ }
+ }
+ }
+
+ private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int numDeleteFiles, String description,
+ String newFlinkJobId, long checkpointId) {
+ LOG.info("Committing {} with {} data files and {} delete files to table {}", description, numDataFiles,
+ numDeleteFiles, table);
+ operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
+ operation.set(FLINK_JOB_ID, newFlinkJobId);
+
+ long start = System.currentTimeMillis();
+ operation.commit(); // abort is automatically called if this fails.
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("Committed in {} ms", duration);
+ }
+
+ @Override
+ public void processElement(WriteResult value)
+ throws Exception {
+ this.writeResultsOfCurrentCkpt.add(value);
+ }
+
+ @Override
+ public void endInput() throws IOException {
+ // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
+ long currentCheckpointId = Long.MAX_VALUE;
+ dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
+ writeResultsOfCurrentCkpt.clear();
+
+ commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId);
+ }
+
+ /**
+ * Write all the complete data files to a newly created manifest file and return the manifest's avro serialized
+ * bytes.
+ */
+ private byte[] writeToManifest(long checkpointId) throws IOException {
+ if (writeResultsOfCurrentCkpt.isEmpty()) {
+ return EMPTY_MANIFEST_DATA;
+ }
+
+ WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
+ DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result,
+ () -> manifestOutputFileFactory.create(checkpointId), table.spec());
+
+ return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests);
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ if (tableLoader != null) {
+ tableLoader.close();
+ }
+ }
+
+ private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor(TableIdentifier tableId) {
+ Comparator<Long> longComparator = Comparators.forType(Types.LongType.get());
+ // Construct a SortedMapTypeInfo.
+ SortedMapTypeInfo<Long, byte[]> sortedMapTypeInfo = new SortedMapTypeInfo<>(
+ BasicTypeInfo.LONG_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, longComparator
+ );
+ return new ListStateDescriptor<>(
+ String.format("iceberg(%s)-files-committer-state", tableId.toString()), sortedMapTypeInfo);
+ }
+
+ static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
+ Snapshot snapshot = table.currentSnapshot();
+ long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+ while (snapshot != null) {
+ Map<String, String> summary = snapshot.summary();
+ String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
+ if (flinkJobId.equals(snapshotFlinkJobId)) {
+ String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
+ if (value != null) {
+ lastCommittedCheckpointId = Long.parseLong(value);
+ break;
+ }
+ }
+ Long parentSnapshotId = snapshot.parentId();
+ snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null;
+ }
+
+ return lastCommittedCheckpointId;
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
similarity index 68%
rename from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
rename to inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index 6f1b75ea0..1fd8586b3 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -1,177 +1,176 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.sort.iceberg.sink;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.iceberg.flink.sink.TaskWriterFactory;
-import org.apache.iceberg.io.TaskWriter;
-import org.apache.iceberg.io.WriteResult;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
-import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.util.MetricStateUtils;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
-
-/**
- * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
- */
-class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
- implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {
-
- private static final long serialVersionUID = 1L;
-
- private final String fullTableName;
- private final TaskWriterFactory<T> taskWriterFactory;
- private final String inlongMetric;
- private final String auditHostAndPorts;
-
- private transient TaskWriter<T> writer;
- private transient int subTaskId;
- private transient int attemptId;
- @Nullable
- private transient SinkMetricData metricData;
- private transient ListState<MetricState> metricStateListState;
- private transient MetricState metricState;
-
- IcebergStreamWriter(
- String fullTableName,
- TaskWriterFactory<T> taskWriterFactory,
- String inlongMetric,
- String auditHostAndPorts) {
- this.fullTableName = fullTableName;
- this.taskWriterFactory = taskWriterFactory;
- this.inlongMetric = inlongMetric;
- this.auditHostAndPorts = auditHostAndPorts;
- setChainingStrategy(ChainingStrategy.ALWAYS);
- }
-
- @Override
- public void open() {
- this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
- this.attemptId = getRuntimeContext().getAttemptNumber();
-
- // Initialize the task writer factory.
- this.taskWriterFactory.initialize(subTaskId, attemptId);
-
- // Initialize the task writer.
- this.writer = taskWriterFactory.create();
-
- // Initialize metric
- MetricOption metricOption = MetricOption.builder()
- .withInlongLabels(inlongMetric)
- .withInlongAudit(auditHostAndPorts)
- .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
- .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
- .withRegisterMetric(RegisteredMetric.ALL)
- .build();
- if (metricOption != null) {
- metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
- }
- }
-
- @Override
- public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
- // close all open files and emit files to downstream committer operator
- emit(writer.complete());
-
- this.writer = taskWriterFactory.create();
- }
-
- @Override
- public void processElement(StreamRecord<T> element) throws Exception {
- writer.write(element.getValue());
-
- if (metricData != null) {
- metricData.invokeWithEstimate(element.getValue());
- }
- }
-
- @Override
- public void initializeState(StateInitializationContext context) throws Exception {
- super.initializeState(context);
- // init metric state
- if (this.inlongMetric != null) {
- this.metricStateListState = context.getOperatorStateStore().getUnionListState(
- new ListStateDescriptor<>(
- INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
- })));
- }
- if (context.isRestored()) {
- metricState = MetricStateUtils.restoreMetricState(metricStateListState,
- getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
- }
- }
-
- @Override
- public void snapshotState(StateSnapshotContext context) throws Exception {
- super.snapshotState(context);
- if (metricData != null && metricStateListState != null) {
- MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
- getRuntimeContext().getIndexOfThisSubtask());
- }
- }
-
- @Override
- public void dispose() throws Exception {
- super.dispose();
- if (writer != null) {
- writer.close();
- writer = null;
- }
- }
-
- @Override
- public void endInput() throws IOException {
- // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining
- // completed files to downstream before closing the writer so that we won't miss any of them.
- emit(writer.complete());
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("table_name", fullTableName)
- .add("subtask_id", subTaskId)
- .add("attempt_id", attemptId)
- .toString();
- }
-
- private void emit(WriteResult result) {
- output.collect(new StreamRecord<>(result));
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
+public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, WriteResult>
+ implements CheckpointedFunction, SchemaEvolutionFunction<TaskWriterFactory<T>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String fullTableName;
+ private final String inlongMetric;
+ private final String auditHostAndPorts;
+ private TaskWriterFactory<T> taskWriterFactory;
+
+ private transient TaskWriter<T> writer;
+ private transient int subTaskId;
+ private transient int attemptId;
+ @Nullable
+ private transient SinkMetricData metricData;
+ private transient ListState<MetricState> metricStateListState;
+ private transient MetricState metricState;
+
+ public IcebergSingleStreamWriter(
+ String fullTableName,
+ TaskWriterFactory<T> taskWriterFactory,
+ String inlongMetric,
+ String auditHostAndPorts) {
+ this.fullTableName = fullTableName;
+ this.taskWriterFactory = taskWriterFactory;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+ this.attemptId = getRuntimeContext().getAttemptNumber();
+
+ // Initialize the task writer factory.
+ this.taskWriterFactory.initialize(subTaskId, attemptId);
+
+ // Initialize the task writer.
+ this.writer = taskWriterFactory.create();
+
+ // Initialize metric
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withRegisterMetric(RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+ }
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ // close all open files and emit files to downstream committer operator
+ emit(writer.complete());
+
+ this.writer = taskWriterFactory.create();
+ }
+
+ @Override
+ public void processElement(T value)
+ throws Exception {
+ writer.write(value);
+
+ if (metricData != null) {
+ metricData.invokeWithEstimate(value);
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ // init metric state
+ if (this.inlongMetric != null) {
+ this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
+ if (context.isRestored()) {
+ metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ if (metricData != null && metricStateListState != null) {
+ MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ }
+ }
+
+ @Override
+ public void endInput() throws IOException {
+ // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining
+ // completed files to downstream before closing the writer so that we won't miss any of them.
+ emit(writer.complete());
+ }
+
+ @Override
+ public void schemaEvolution(TaskWriterFactory<T> schema) throws IOException {
+ emit(writer.complete());
+
+ taskWriterFactory = schema;
+ taskWriterFactory.initialize(subTaskId, attemptId);
+ writer = taskWriterFactory.create();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("table_name", fullTableName)
+ .add("subtask_id", subTaskId)
+ .add("attempt_id", attemptId)
+ .toString();
+ }
+
+ private void emit(WriteResult result) {
+ collector.collect(result);
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/MultipleWriteResult.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/MultipleWriteResult.java
new file mode 100644
index 000000000..4b004a140
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/MultipleWriteResult.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.WriteResult;
+
+public class MultipleWriteResult {
+ private TableIdentifier tableId;
+
+ private WriteResult writeResult;
+
+ public MultipleWriteResult(TableIdentifier tableId, WriteResult writeResult) {
+ this.tableId = tableId;
+ this.writeResult = writeResult;
+ }
+
+ public TableIdentifier getTableId() {
+ return tableId;
+ }
+
+ public WriteResult getWriteResult() {
+ return writeResult;
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
new file mode 100644
index 000000000..bc08a4e40
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.MapType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+public class RecordWithSchema {
+
+ public RecordWithSchema(
+ JsonNode originalData,
+ Schema schema,
+ TableIdentifier tableId,
+ List<String> primaryKeys) {
+ this.originalData = originalData;
+ this.schema = schema;
+ this.tableId = tableId;
+ this.primaryKeys = primaryKeys;
+ }
+
+ private transient JsonNode originalData;
+
+ private List<RowData> data;
+
+ private Schema schema;
+
+ private final TableIdentifier tableId;
+
+ private final List<String> primaryKeys;
+
+ public List<RowData> getData() {
+ return data;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public TableIdentifier getTableId() {
+ return tableId;
+ }
+
+ public List<String> getPrimaryKeys() {
+ return primaryKeys;
+ }
+
+ public RecordWithSchema refreshFieldId(Schema newSchema) {
+ // Solve the problem that there is no fieldId on the schema parsed from the data
+ // So here refresh catalog loaded schema
+ schema = newSchema.select(schema.columns().stream().map(NestedField::name).collect(Collectors.toList()));
+ return this;
+ }
+
+ public RecordWithSchema refreshRowData(BiFunction<JsonNode, Schema, List<RowData>> rowDataExtractor) {
+ // Solve the problem of type error during downstream parsing. Here, rowData is set to be compatible with rowType
+ data = rowDataExtractor.apply(originalData, schema);
+ return this;
+ }
+
+ // todo: here RecordWithSchema is deserialized from network, it's `Schema` is the new object, the `Type` is the
+ // same.However `Type` do not implement equals method, so some method will return unexpected result when
+ // compare this schema with Table#schema loaded from catalog, For example, Schema#sameSchema will return false
+ // even thought schema is the same, can't get the comparators of Type even thought type is the same.
+ public void replaceSchema() {
+ List<NestedField> columns = schema.columns();
+ List<NestedField> newColumns = new ArrayList<>();
+ for (int i = 0; i < columns.size(); i++) {
+ newColumns.add(replaceField(columns.get(i)));
+ }
+ schema = new Schema(schema.schemaId(), newColumns, schema.getAliases(), schema.identifierFieldIds());
+ }
+
+ private static NestedField replaceField(NestedField nestedField) {
+ return NestedField.of(
+ nestedField.fieldId(),
+ nestedField.isOptional(),
+ nestedField.name(),
+ replaceType(nestedField.type()),
+ nestedField.doc());
+ }
+
+ private static Type replaceType(Type type) {
+ switch (type.typeId()) {
+ case BOOLEAN:
+ return BooleanType.get();
+ case INTEGER:
+ return IntegerType.get();
+ case LONG:
+ return LongType.get();
+ case FLOAT:
+ return FloatType.get();
+ case DOUBLE:
+ return DoubleType.get();
+ case DATE:
+ return DateType.get();
+ case TIME:
+ return TimeType.get();
+ case TIMESTAMP:
+ return ((TimestampType) type).shouldAdjustToUTC()
+ ? TimestampType.withZone() : TimestampType.withoutZone();
+ case STRING:
+ return StringType.get();
+ case UUID:
+ return UUIDType.get();
+ case BINARY:
+ return BinaryType.get();
+ case FIXED:
+ case DECIMAL:
+ return type;
+ case STRUCT:
+ return StructType.of(
+ ((StructType) type).fields()
+ .stream()
+ .map(RecordWithSchema::replaceField)
+ .collect(Collectors.toList()));
+ case LIST:
+ ListType listType = ((ListType) type);
+ return listType.isElementOptional()
+ ? ListType.ofOptional(listType.elementId(), replaceType(listType.elementType()))
+ : ListType.ofRequired(listType.elementId(), replaceType(listType.elementType()));
+ case MAP:
+ MapType mapType = ((MapType) type);
+ return mapType.isValueOptional()
+ ? MapType.ofOptional(mapType.keyId(), mapType.valueId(),
+ replaceType(mapType.keyType()), replaceType(mapType.valueType()))
+ : MapType.ofRequired(mapType.keyId(), mapType.valueId(),
+ replaceType(mapType.keyType()), replaceType(mapType.valueType()));
+ default:
+ throw new UnsupportedOperationException("Unspported type: " + type);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
new file mode 100644
index 000000000..b73d27519
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.sink.TableChange;
+import org.apache.inlong.sort.iceberg.FlinkTypeToType;
+import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
+import org.apache.inlong.sort.base.sink.TableChange.ColumnPosition;
+import org.apache.inlong.sort.base.sink.TableChange.UnknownColumnChange;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SchemaChangeUtils {
+ private static final Joiner DOT = Joiner.on(".");
+
+ /**
+ * Compare two schemas and get the schema changes that happened in them.
+ * TODO: currently only support add column
+ *
+ * @param oldSchema
+ * @param newSchema
+ * @return
+ */
+ static List<TableChange> diffSchema(Schema oldSchema, Schema newSchema) {
+ List<String> oldFields = oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
+ List<String> newFields = newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
+ int oi = 0;
+ int ni = 0;
+ List<TableChange> tableChanges = new ArrayList<>();
+ while (ni < newFields.size()) {
+ if (oi < oldFields.size() && oldFields.get(oi).equals(newFields.get(ni))) {
+ oi++;
+ ni++;
+ } else {
+ NestedField newField = newSchema.findField(newFields.get(ni));
+ tableChanges.add(
+ new AddColumn(
+ new String[]{newField.name()},
+ FlinkSchemaUtil.convert(newField.type()),
+ !newField.isRequired(),
+ newField.doc(),
+ ni == 0 ? ColumnPosition.first() : ColumnPosition.after(newFields.get(ni - 1))));
+ ni++;
+ }
+ }
+
+ if (oi != oldFields.size()) {
+ tableChanges.clear();
+ tableChanges.add(
+ new UnknownColumnChange(
+ String.format("Unsupported schema update.\n"
+ + "oldSchema:\n%s\n, newSchema:\n %s", oldSchema, newSchema)));
+ }
+
+ return tableChanges;
+ }
+
+ public static void applySchemaChanges(UpdateSchema pendingUpdate, List<TableChange> tableChanges) {
+ for (TableChange change : tableChanges) {
+ if (change instanceof TableChange.AddColumn) {
+ apply(pendingUpdate, (TableChange.AddColumn) change);
+ } else {
+ throw new UnsupportedOperationException("Cannot apply unknown table change: " + change);
+ }
+ }
+ pendingUpdate.commit();
+ }
+
+ public static void apply(UpdateSchema pendingUpdate, TableChange.AddColumn add) {
+ Preconditions.checkArgument(add.isNullable(),
+ "Incompatible change: cannot add required column: %s", leafName(add.fieldNames()));
+ Type type = add.dataType().accept(new FlinkTypeToType(RowType.of(add.dataType())));
+ pendingUpdate.addColumn(parentName(add.fieldNames()), leafName(add.fieldNames()), type, add.comment());
+
+ if (add.position() instanceof TableChange.After) {
+ TableChange.After after = (TableChange.After) add.position();
+ String referenceField = peerName(add.fieldNames(), after.column());
+ pendingUpdate.moveAfter(DOT.join(add.fieldNames()), referenceField);
+
+ } else if (add.position() instanceof TableChange.First) {
+ pendingUpdate.moveFirst(DOT.join(add.fieldNames()));
+
+ } else {
+ Preconditions.checkArgument(add.position() == null,
+ "Cannot add '%s' at unknown position: %s", DOT.join(add.fieldNames()), add.position());
+ }
+ }
+
+ public static String leafName(String[] fieldNames) {
+ Preconditions.checkArgument(fieldNames.length > 0, "Invalid field name: at least one name is required");
+ return fieldNames[fieldNames.length - 1];
+ }
+
+ public static String peerName(String[] fieldNames, String fieldName) {
+ if (fieldNames.length > 1) {
+ String[] peerNames = Arrays.copyOf(fieldNames, fieldNames.length);
+ peerNames[fieldNames.length - 1] = fieldName;
+ return DOT.join(peerNames);
+ }
+ return fieldName;
+ }
+
+ public static String parentName(String[] fieldNames) {
+ if (fieldNames.length > 1) {
+ return DOT.join(Arrays.copyOfRange(fieldNames, 0, fieldNames.length - 1));
+ }
+ return null;
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaEvolutionFunction.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaEvolutionFunction.java
new file mode 100644
index 000000000..aee6ed13e
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaEvolutionFunction.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+public interface SchemaEvolutionFunction<Schema> {
+ void schemaEvolution(Schema schema) throws Exception;
+}