You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/10/25 08:23:11 UTC

[inlong] branch master updated: [INLONG-6274][Sort] Support multiple sink for IcebergLoadNode (#6215)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f8b1efa7a [INLONG-6274][Sort] Support multiple sink for IcebergLoadNode (#6215)
f8b1efa7a is described below

commit f8b1efa7aeea207bb6a48b57833c990c96cee28b
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Tue Oct 25 16:23:05 2022 +0800

    [INLONG-6274][Sort] Support multiple sink for IcebergLoadNode (#6215)
---
 .../sort/base/format/JsonDynamicSchemaFormat.java  |   2 +-
 inlong-sort/sort-connectors/iceberg/pom.xml        |   8 +
 .../sort/iceberg/FlinkDynamicTableFactory.java     |  46 +-
 .../inlong/sort/iceberg/FlinkTypeToType.java       | 201 ++++++
 .../inlong/sort/iceberg/IcebergTableSink.java      |  55 +-
 .../inlong/sort/iceberg/sink/DeltaManifests.java   |  14 +-
 .../iceberg/sink/DeltaManifestsSerializer.java     |   4 +-
 .../sort/iceberg/sink/FlinkManifestUtil.java       |  12 +-
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 134 +++-
 .../iceberg/sink/ManifestOutputFileFactory.java    |   4 +-
 .../iceberg/sink/RowDataTaskWriterFactory.java     |   3 +-
 .../sink/multiple/DynamicSchemaHandleOperator.java | 293 ++++++++
 .../multiple/IcebergMultipleFilesCommiter.java     | 106 +++
 .../sink/multiple/IcebergMultipleStreamWriter.java | 207 ++++++
 .../sink/multiple/IcebergProcessFunction.java      | 107 +++
 .../sink/multiple/IcebergProcessOperator.java      | 144 ++++
 .../IcebergSingleFileCommiter.java}                | 785 +++++++++++----------
 .../IcebergSingleStreamWriter.java}                | 353 +++++----
 .../iceberg/sink/multiple/MultipleWriteResult.java |  41 ++
 .../iceberg/sink/multiple/RecordWithSchema.java    | 171 +++++
 .../iceberg/sink/multiple/SchemaChangeUtils.java   | 136 ++++
 .../sink/multiple/SchemaEvolutionFunction.java     |  23 +
 22 files changed, 2229 insertions(+), 620 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index 3ad9117f7..f2527d253 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -96,7 +96,7 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
 
     protected JsonDynamicSchemaFormat() {
         this.rowDataConverters =
-                new JsonToRowDataConverters(true, false, TimestampFormat.SQL);
+                new JsonToRowDataConverters(true, false, TimestampFormat.ISO_8601);
     }
 
     /**
diff --git a/inlong-sort/sort-connectors/iceberg/pom.xml b/inlong-sort/sort-connectors/iceberg/pom.xml
index 115fc0a69..b5ade006f 100644
--- a/inlong-sort/sort-connectors/iceberg/pom.xml
+++ b/inlong-sort/sort-connectors/iceberg/pom.xml
@@ -101,6 +101,14 @@
                                     </includes>
                                 </filter>
                             </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.inlong.sort.base</pattern>
+                                    <shadedPattern>
+                                        org.apache.inlong.sort.iceberg.shaded.org.apache.inlong.sort.base
+                                    </shadedPattern>
+                                </relocation>
+                            </relocations>
                         </configuration>
                     </execution>
                 </executions>
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index f12bd1b56..5d63d8dc9 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.Preconditions;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.IcebergTableSource;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -49,6 +50,12 @@ import java.util.Set;
 
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ADD_COLUMN_POLICY;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DEL_COLUMN_POLICY;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
 
 /**
@@ -109,6 +116,17 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
         return GET_CATALOG_TABLE.invoke(context);
     }
 
+    private static CatalogLoader createCatalogLoader(Map<String, String> tableProps) {
+        Configuration flinkConf = new Configuration();
+        tableProps.forEach(flinkConf::setString);
+
+        String catalogName = flinkConf.getString(CATALOG_NAME);
+        Preconditions.checkNotNull(catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key());
+
+        org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
+        return FlinkCatalogFactory.createCatalogLoader(catalogName, tableProps, hadoopConf);
+    }
+
     private static TableLoader createTableLoader(CatalogBaseTable catalogBaseTable,
             Map<String, String> tableProps,
             String databaseName,
@@ -185,15 +203,21 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
         Map<String, String> tableProps = catalogTable.getOptions();
         TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
 
-        TableLoader tableLoader;
-        if (catalog != null) {
-            tableLoader = createTableLoader(catalog, objectPath);
+        boolean multipleSink = Boolean.parseBoolean(
+                tableProps.getOrDefault(SINK_MULTIPLE_ENABLE.key(), SINK_MULTIPLE_ENABLE.defaultValue().toString()));
+        if (multipleSink) {
+            CatalogLoader catalogLoader = createCatalogLoader(tableProps);
+            return new IcebergTableSink(null, tableSchema, catalogTable, catalogLoader);
         } else {
-            tableLoader = createTableLoader(catalogTable, tableProps, objectPath.getDatabaseName(),
-                    objectPath.getObjectName());
+            TableLoader tableLoader;
+            if (catalog != null) {
+                tableLoader = createTableLoader(catalog, objectPath);
+            } else {
+                tableLoader = createTableLoader(catalogTable, tableProps, objectPath.getDatabaseName(),
+                        objectPath.getObjectName());
+            }
+            return new IcebergTableSink(tableLoader, tableSchema, catalogTable, null);
         }
-
-        return new IcebergTableSink(tableLoader, tableSchema, catalogTable);
     }
 
     @Override
@@ -212,6 +236,13 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
         options.add(ICEBERG_IGNORE_ALL_CHANGELOG);
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
+
+        options.add(SINK_MULTIPLE_ENABLE);
+        options.add(SINK_MULTIPLE_FORMAT);
+        options.add(SINK_MULTIPLE_DATABASE_PATTERN);
+        options.add(SINK_MULTIPLE_TABLE_PATTERN);
+        options.add(SINK_MULTIPLE_ADD_COLUMN_POLICY);
+        options.add(SINK_MULTIPLE_DEL_COLUMN_POLICY);
         return options;
     }
 
@@ -220,3 +251,4 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
         return FACTORY_IDENTIFIER;
     }
 }
+
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkTypeToType.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkTypeToType.java
new file mode 100644
index 000000000..f18d99b3d
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkTypeToType.java
@@ -0,0 +1,201 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.iceberg.flink.FlinkTypeVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkTypeToType extends FlinkTypeVisitor<Type> {
+
+    private final RowType root;
+    private int nextId;
+
+    public FlinkTypeToType(RowType root) {
+        this.root = root;
+        // the root struct's fields use the first ids
+        this.nextId = root.getFieldCount();
+    }
+
+    private int getNextId() {
+        int next = nextId;
+        nextId += 1;
+        return next;
+    }
+
+    @Override
+    public Type visit(CharType charType) {
+        return Types.StringType.get();
+    }
+
+    @Override
+    public Type visit(VarCharType varCharType) {
+        return Types.StringType.get();
+    }
+
+    @Override
+    public Type visit(BooleanType booleanType) {
+        return Types.BooleanType.get();
+    }
+
+    @Override
+    public Type visit(BinaryType binaryType) {
+        return Types.FixedType.ofLength(binaryType.getLength());
+    }
+
+    @Override
+    public Type visit(VarBinaryType varBinaryType) {
+        return Types.BinaryType.get();
+    }
+
+    @Override
+    public Type visit(DecimalType decimalType) {
+        return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
+    }
+
+    @Override
+    public Type visit(TinyIntType tinyIntType) {
+        return Types.IntegerType.get();
+    }
+
+    @Override
+    public Type visit(SmallIntType smallIntType) {
+        return Types.IntegerType.get();
+    }
+
+    @Override
+    public Type visit(IntType intType) {
+        return Types.IntegerType.get();
+    }
+
+    @Override
+    public Type visit(BigIntType bigIntType) {
+        return Types.LongType.get();
+    }
+
+    @Override
+    public Type visit(FloatType floatType) {
+        return Types.FloatType.get();
+    }
+
+    @Override
+    public Type visit(DoubleType doubleType) {
+        return Types.DoubleType.get();
+    }
+
+    @Override
+    public Type visit(DateType dateType) {
+        return Types.DateType.get();
+    }
+
+    @Override
+    public Type visit(TimeType timeType) {
+        return Types.TimeType.get();
+    }
+
+    @Override
+    public Type visit(TimestampType timestampType) {
+        return Types.TimestampType.withoutZone();
+    }
+
+    @Override
+    public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+        return Types.TimestampType.withZone();
+    }
+
+    @Override
+    public Type visit(ArrayType arrayType) {
+        Type elementType = arrayType.getElementType().accept(this);
+        if (arrayType.getElementType().isNullable()) {
+            return Types.ListType.ofOptional(getNextId(), elementType);
+        } else {
+            return Types.ListType.ofRequired(getNextId(), elementType);
+        }
+    }
+
+    @Override
+    public Type visit(MultisetType multisetType) {
+        Type elementType = multisetType.getElementType().accept(this);
+        return Types.MapType.ofRequired(getNextId(), getNextId(), elementType, Types.IntegerType.get());
+    }
+
+    @Override
+    public Type visit(MapType mapType) {
+        // keys in map are not allowed to be null.
+        Type keyType = mapType.getKeyType().accept(this);
+        Type valueType = mapType.getValueType().accept(this);
+        if (mapType.getValueType().isNullable()) {
+            return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType);
+        } else {
+            return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("ReferenceEquality")
+    public Type visit(RowType rowType) {
+        List<NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
+        boolean isRoot = root == rowType;
+
+        List<Type> types = rowType.getFields().stream()
+                .map(f -> f.getType().accept(this))
+                .collect(Collectors.toList());
+
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            int id = isRoot ? i : getNextId();
+
+            RowType.RowField field = rowType.getFields().get(i);
+            String name = field.getName();
+            String comment = field.getDescription().orElse(null);
+
+            if (field.getType().isNullable()) {
+                newFields.add(Types.NestedField.optional(id, name, types.get(i), comment));
+            } else {
+                newFields.add(Types.NestedField.required(id, name, types.get(i), comment));
+            }
+        }
+
+        return Types.StructType.of(newFields);
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index 37eb2670b..8296d02ff 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -19,6 +19,8 @@
 
 package org.apache.inlong.sort.iceberg;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -29,18 +31,25 @@ import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.iceberg.sink.FlinkSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ADD_COLUMN_POLICY;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DEL_COLUMN_POLICY;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
 
 /**
@@ -56,6 +65,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
     private final TableSchema tableSchema;
 
     private final CatalogTable catalogTable;
+    private final CatalogLoader catalogLoader;
 
     private boolean overwrite = false;
 
@@ -64,12 +74,15 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
         this.tableSchema = toCopy.tableSchema;
         this.overwrite = toCopy.overwrite;
         this.catalogTable = toCopy.catalogTable;
+        this.catalogLoader = toCopy.catalogLoader;
     }
 
-    public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema, CatalogTable catalogTable) {
+    public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema, CatalogTable catalogTable,
+            CatalogLoader catalogLoader) {
         this.tableLoader = tableLoader;
         this.tableSchema = tableSchema;
         this.catalogTable = catalogTable;
+        this.catalogLoader = catalogLoader;
     }
 
     @Override
@@ -81,17 +94,33 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
                 .map(UniqueConstraint::getColumns)
                 .orElseGet(ImmutableList::of);
 
-        return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
-                .tableLoader(tableLoader)
-                .tableSchema(tableSchema)
-                .equalityFieldColumns(equalityColumns)
-                .overwrite(overwrite)
-                .appendMode(Boolean.valueOf(
-                        Optional.ofNullable(catalogTable.getOptions().get(ICEBERG_IGNORE_ALL_CHANGELOG.key()))
-                                .orElse(ICEBERG_IGNORE_ALL_CHANGELOG.defaultValue().toString())))
-                .metric(catalogTable.getOptions().getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()),
-                        catalogTable.getOptions().getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()))
-                .append();
+        final ReadableConfig tableOptions = Configuration.fromMap(catalogTable.getOptions());
+        boolean multipleSink = tableOptions.get(SINK_MULTIPLE_ENABLE);
+        if (multipleSink) {
+            return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
+                    .overwrite(overwrite)
+                    .appendMode(tableOptions.get(ICEBERG_IGNORE_ALL_CHANGELOG))
+                    .metric(tableOptions.get(INLONG_METRIC), tableOptions.get(INLONG_AUDIT))
+                    .catalogLoader(catalogLoader)
+                    .multipleSink(tableOptions.get(SINK_MULTIPLE_ENABLE))
+                    .multipleSinkOption(MultipleSinkOption.builder()
+                            .withFormat(tableOptions.get(SINK_MULTIPLE_FORMAT))
+                            .withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN))
+                            .withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN))
+                            .withAddColumnPolicy(tableOptions.get(SINK_MULTIPLE_ADD_COLUMN_POLICY))
+                            .withDelColumnPolicy(tableOptions.get(SINK_MULTIPLE_DEL_COLUMN_POLICY))
+                            .build())
+                    .append();
+        } else {
+            return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
+                    .tableLoader(tableLoader)
+                    .tableSchema(tableSchema)
+                    .equalityFieldColumns(equalityColumns)
+                    .overwrite(overwrite)
+                    .appendMode(tableOptions.get(ICEBERG_IGNORE_ALL_CHANGELOG))
+                    .metric(tableOptions.get(INLONG_METRIC), tableOptions.get(INLONG_AUDIT))
+                    .append();
+        }
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
index dae992042..4ec9683df 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
@@ -28,7 +28,7 @@ import java.util.List;
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
-class DeltaManifests {
+public class DeltaManifests {
 
     private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0];
 
@@ -36,11 +36,11 @@ class DeltaManifests {
     private final ManifestFile deleteManifest;
     private final CharSequence[] referencedDataFiles;
 
-    DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) {
+    public DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) {
         this(dataManifest, deleteManifest, EMPTY_REF_DATA_FILES);
     }
 
-    DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest, CharSequence[] referencedDataFiles) {
+    public DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest, CharSequence[] referencedDataFiles) {
         Preconditions.checkNotNull(referencedDataFiles, "Referenced data files shouldn't be null.");
 
         this.dataManifest = dataManifest;
@@ -48,19 +48,19 @@ class DeltaManifests {
         this.referencedDataFiles = referencedDataFiles;
     }
 
-    ManifestFile dataManifest() {
+    public ManifestFile dataManifest() {
         return dataManifest;
     }
 
-    ManifestFile deleteManifest() {
+    public ManifestFile deleteManifest() {
         return deleteManifest;
     }
 
-    CharSequence[] referencedDataFiles() {
+    public CharSequence[] referencedDataFiles() {
         return referencedDataFiles;
     }
 
-    List<ManifestFile> manifests() {
+    public List<ManifestFile> manifests() {
         List<ManifestFile> manifests = Lists.newArrayListWithCapacity(2);
         if (dataManifest != null) {
             manifests.add(dataManifest);
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
index 1ae2bcbe5..4a53ab574 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
@@ -33,12 +33,12 @@ import java.io.IOException;
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
-class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
+public class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
     private static final int VERSION_1 = 1;
     private static final int VERSION_2 = 2;
     private static final byte[] EMPTY_BINARY = new byte[0];
 
-    static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer();
+    public static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer();
 
     @Override
     public int getVersion() {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
index 46d7beec9..3a2cc8fff 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
@@ -41,14 +41,14 @@ import java.util.function.Supplier;
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
-class FlinkManifestUtil {
+public class FlinkManifestUtil {
     private static final int FORMAT_V2 = 2;
     private static final Long DUMMY_SNAPSHOT_ID = 0L;
 
     private FlinkManifestUtil() {
     }
 
-    static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles)
+    public static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles)
             throws IOException {
         ManifestWriter<DataFile> writer = ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);
 
@@ -59,19 +59,19 @@ class FlinkManifestUtil {
         return writer.toManifestFile();
     }
 
-    static List<DataFile> readDataFiles(ManifestFile manifestFile, FileIO io) throws IOException {
+    public static List<DataFile> readDataFiles(ManifestFile manifestFile, FileIO io) throws IOException {
         try (CloseableIterable<DataFile> dataFiles = ManifestFiles.read(manifestFile, io)) {
             return Lists.newArrayList(dataFiles);
         }
     }
 
-    static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
+    public static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
             long attemptNumber) {
         TableOperations ops = ((HasTableOperations) table).operations();
         return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
     }
 
-    static DeltaManifests writeCompletedFiles(WriteResult result,
+    public static DeltaManifests writeCompletedFiles(WriteResult result,
             Supplier<OutputFile> outputFileSupplier,
             PartitionSpec spec) throws IOException {
 
@@ -101,7 +101,7 @@ class FlinkManifestUtil {
         return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
     }
 
-    static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io) throws IOException {
+    public static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io) throws IOException {
         WriteResult.Builder builder = WriteResult.builder();
 
         // Read the completed data files from persisted data manifest file.
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 8662722d8..bb7498650 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -40,6 +40,8 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.sink.TaskWriterFactory;
@@ -49,6 +51,15 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergMultipleFilesCommiter;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergMultipleStreamWriter;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergProcessOperator;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergSingleFileCommiter;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergSingleStreamWriter;
+import org.apache.inlong.sort.iceberg.sink.multiple.MultipleWriteResult;
+import org.apache.inlong.sort.iceberg.sink.multiple.RecordWithSchema;
+import org.apache.inlong.sort.iceberg.sink.multiple.DynamicSchemaHandleOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,8 +87,14 @@ import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DE
 public class FlinkSink {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
 
-    private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
-    private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();
+    private static final String ICEBERG_STREAM_WRITER_NAME = IcebergSingleStreamWriter.class.getSimpleName();
+    private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergSingleFileCommiter.class.getSimpleName();
+    private static final String ICEBERG_MULTIPLE_STREAM_WRITER_NAME =
+            IcebergMultipleStreamWriter.class.getSimpleName();
+    private static final String ICEBERG_MULTIPLE_FILES_COMMITTER_NAME =
+            IcebergMultipleFilesCommiter.class.getSimpleName();
+    private static final String ICEBERG_WHOLE_DATABASE_MIGRATION_NAME =
+            DynamicSchemaHandleOperator.class.getSimpleName();
 
     private FlinkSink() {
     }
@@ -141,6 +158,9 @@ public class FlinkSink {
         private String uidPrefix = null;
         private String inlongMetric = null;
         private String auditHostAndPorts = null;
+        private CatalogLoader catalogLoader = null;
+        private boolean multipleSink = false;
+        private MultipleSinkOption multipleSinkOption = null;
 
         private Builder() {
         }
@@ -166,9 +186,9 @@ public class FlinkSink {
         }
 
         /**
-         * This iceberg {@link Table} instance is used for initializing {@link IcebergStreamWriter} which will write all
-         * the records into {@link DataFile}s and emit them to downstream operator. Providing a table would avoid so
-         * many table loading from each separate task.
+         * This iceberg {@link Table} instance is used for initializing {@link IcebergSingleStreamWriter} which will
+         * write all the records into {@link DataFile}s and emit them to downstream operator. Providing a table would \
+         * avoid so many table loading from each separate task.
          *
          * @param newTable the loaded iceberg table instance.
          * @return {@link Builder} to connect the iceberg table.
@@ -179,7 +199,7 @@ public class FlinkSink {
         }
 
         /**
-         * The table loader is used for loading tables in {@link IcebergFilesCommitter} lazily, we need this loader
+         * The table loader is used for loading tables in {@link IcebergSingleFileCommiter} lazily, we need this loader
          * because {@link Table} is not serializable and could not just use the loaded table from Builder#table in the
          * remote task manager.
          *
@@ -196,6 +216,29 @@ public class FlinkSink {
             return this;
         }
 
+        /**
+         * The catalog loader is used for loading tables in {@link IcebergMultipleStreamWriter} and
+         * {@link DynamicSchemaHandleOperator} lazily, we need this loader because in multiple sink scene which table
+         * to load is determined in runtime, so we should hold a {@link org.apache.iceberg.catalog.Catalog} at runtime.
+         *
+         * @param catalogLoader to load iceberg catalog inside tasks.
+         * @return {@link Builder} to connect the iceberg table.
+         */
+        public Builder catalogLoader(CatalogLoader catalogLoader) {
+            this.catalogLoader = catalogLoader;
+            return this;
+        }
+
+        public Builder multipleSink(boolean multipleSink) {
+            this.multipleSink = multipleSink;
+            return this;
+        }
+
+        public Builder multipleSinkOption(MultipleSinkOption multipleSinkOption) {
+            this.multipleSinkOption = multipleSinkOption;
+            return this;
+        }
+
         public Builder overwrite(boolean newOverwrite) {
             this.overwrite = newOverwrite;
             return this;
@@ -302,6 +345,11 @@ public class FlinkSink {
         }
 
         private <T> DataStreamSink<T> chainIcebergOperators() {
+            return multipleSink ? chainIcebergMultipleSinkOperators() : chainIcebergSingleSinkOperators();
+        }
+
+        private <T> DataStreamSink<T> chainIcebergSingleSinkOperators() {
+
             Preconditions.checkArgument(inputCreator != null,
                     "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
             Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
@@ -335,6 +383,24 @@ public class FlinkSink {
             return appendDummySink(committerStream);
         }
 
+        private <T> DataStreamSink<T> chainIcebergMultipleSinkOperators() {
+            Preconditions.checkArgument(inputCreator != null,
+                    "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
+            DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);
+
+            // Add parallel writers that append rows to files
+            SingleOutputStreamOperator<MultipleWriteResult> writerStream = appendMultipleWriter(rowDataInput);
+
+            // Add single-parallelism committer that commits files
+            // after successful checkpoint or end of input
+            SingleOutputStreamOperator<Void> committerStream = appendMultipleCommitter(writerStream);
+
+            // Add dummy discard sink
+            return appendDummySink(committerStream);
+        }
+
+
+
         /**
          * Append the iceberg sink operators to write records to iceberg table.
          *
@@ -364,7 +430,8 @@ public class FlinkSink {
         private <T> DataStreamSink<T> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
             DataStreamSink<T> resultStream = committerStream
                     .addSink(new DiscardingSink())
-                    .name(operatorName(String.format("IcebergSink %s", this.table.name())))
+                    .name(operatorName(
+                            String.format("IcebergSink %s", multipleSink ? "whole migration" : this.table.name())))
                     .setParallelism(1);
             if (uidPrefix != null) {
                 resultStream = resultStream.uid(uidPrefix + "-dummysink");
@@ -373,7 +440,8 @@ public class FlinkSink {
         }
 
         private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
-            IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
+            IcebergProcessOperator<WriteResult, Void> filesCommitter = new IcebergProcessOperator<>(
+                    new IcebergSingleFileCommiter(TableIdentifier.of(table.name()), tableLoader, overwrite));
             SingleOutputStreamOperator<Void> committerStream = writerStream
                     .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
                     .setParallelism(1)
@@ -384,6 +452,20 @@ public class FlinkSink {
             return committerStream;
         }
 
+        private SingleOutputStreamOperator<Void> appendMultipleCommitter(
+                SingleOutputStreamOperator<MultipleWriteResult> writerStream) {
+            IcebergProcessOperator<MultipleWriteResult, Void> multipleFilesCommiter =
+                    new IcebergProcessOperator<>(new IcebergMultipleFilesCommiter(catalogLoader, overwrite));
+            SingleOutputStreamOperator<Void> committerStream = writerStream
+                    .transform(operatorName(ICEBERG_MULTIPLE_FILES_COMMITTER_NAME), Types.VOID, multipleFilesCommiter)
+                    .setParallelism(1)
+                    .setMaxParallelism(1);
+            if (uidPrefix != null) {
+                committerStream = committerStream.uid(uidPrefix + "-committer");
+            }
+            return committerStream;
+        }
+
         private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
             // Find out the equality field id list based on the user-provided equality field column names.
             List<Integer> equalityFieldIds = Lists.newArrayList();
@@ -416,7 +498,7 @@ public class FlinkSink {
                 }
             }
 
-            IcebergStreamWriter<RowData> streamWriter = createStreamWriter(
+            IcebergProcessOperator<RowData, WriteResult> streamWriter = createStreamWriter(
                     table, flinkRowType, equalityFieldIds, upsertMode, appendMode, inlongMetric, auditHostAndPorts);
 
             int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
@@ -431,6 +513,33 @@ public class FlinkSink {
             return writerStream;
         }
 
+        private SingleOutputStreamOperator<MultipleWriteResult> appendMultipleWriter(DataStream<RowData> input) {
+            // equality field will be initialized at runtime
+            // upsert mode will be initialized at runtime
+
+            int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
+            DynamicSchemaHandleOperator routeOperator = new DynamicSchemaHandleOperator(
+                    catalogLoader,
+                    multipleSinkOption);
+            SingleOutputStreamOperator<RecordWithSchema> routeStream = input
+                    .transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
+                            TypeInformation.of(RecordWithSchema.class),
+                            routeOperator)
+                    .setParallelism(parallelism);
+
+            IcebergProcessOperator streamWriter =
+                    new IcebergProcessOperator(new IcebergMultipleStreamWriter(appendMode, catalogLoader));
+            SingleOutputStreamOperator<MultipleWriteResult> writerStream = routeStream
+                    .transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME),
+                            TypeInformation.of(IcebergProcessOperator.class),
+                            streamWriter)
+                    .setParallelism(parallelism);
+            if (uidPrefix != null) {
+                writerStream = writerStream.uid(uidPrefix + "-writer");
+            }
+            return writerStream;
+        }
+
         private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
                 Map<String, String> properties,
                 PartitionSpec partitionSpec,
@@ -487,7 +596,7 @@ public class FlinkSink {
         }
     }
 
-    static IcebergStreamWriter<RowData> createStreamWriter(Table table,
+    static IcebergProcessOperator<RowData, WriteResult> createStreamWriter(Table table,
             RowType flinkRowType,
             List<Integer> equalityFieldIds,
             boolean upsert,
@@ -501,10 +610,11 @@ public class FlinkSink {
 
         Table serializableTable = SerializableTable.copyOf(table);
         TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
-                serializableTable, flinkRowType, targetFileSize,
+                serializableTable, serializableTable.schema(), flinkRowType, targetFileSize,
                 fileFormat, equalityFieldIds, upsert, appendMode);
 
-        return new IcebergStreamWriter<>(table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts);
+        return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>(
+                table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts));
     }
 
     private static FileFormat getFileFormat(Map<String, String> properties) {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
index 97f4f3f02..95549d7d4 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
-class ManifestOutputFileFactory {
+public class ManifestOutputFileFactory {
     // Users could define their own flink manifests directory by setting this value in table properties.
     static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location";
 
@@ -58,7 +58,7 @@ class ManifestOutputFileFactory {
                 attemptNumber, checkpointId, fileCount.incrementAndGet()));
     }
 
-    OutputFile create(long checkpointId) {
+    public OutputFile create(long checkpointId) {
         String flinkManifestDir = props.get(FLINK_MANIFEST_LOCATION);
 
         String newManifestFullPath;
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
index aa724ac7f..f642e86aa 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
@@ -62,6 +62,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
     private transient OutputFileFactory outputFileFactory;
 
     public RowDataTaskWriterFactory(Table table,
+            Schema scheam,
             RowType flinkSchema,
             long targetFileSizeBytes,
             FileFormat format,
@@ -69,7 +70,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
             boolean upsert,
             boolean appendMode) {
         this.table = table;
-        this.schema = table.schema();
+        this.schema = scheam;
         this.flinkSchema = flinkSchema;
         this.spec = table.spec();
         this.io = table.io();
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
new file mode 100644
index 000000000..ea69c12f9
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -0,0 +1,293 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+import org.apache.inlong.sort.base.sink.TableChange;
+import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
+import org.apache.inlong.sort.base.sink.TableChange.DeleteColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE;
+
+public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWithSchema>
+        implements OneInputStreamOperator<RowData, RecordWithSchema>, ProcessingTimeCallback {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DynamicSchemaHandleOperator.class);
+    private static final long HELPER_DEBUG_INTERVEL = 10 * 60 * 1000;
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
+
+    private transient Catalog catalog;
+    private transient SupportsNamespaces asNamespaceCatalog;
+    private transient AbstractDynamicSchemaFormat<JsonNode> dynamicSchemaFormat;
+    private transient ProcessingTimeService processingTimeService;
+
+    // record cache, wait schema to consume record
+    private transient Map<TableIdentifier, Queue<RecordWithSchema>> recordQueues;
+
+    // schema cache
+    private transient Map<TableIdentifier, Schema> schemaCache;
+
+    // blacklist to filter schema update failed table
+    private transient Set<TableIdentifier> blacklist;
+
+    public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
+            MultipleSinkOption multipleSinkOption) {
+        this.catalogLoader = catalogLoader;
+        this.multipleSinkOption = multipleSinkOption;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.catalog = catalogLoader.loadCatalog();
+        this.asNamespaceCatalog =
+                catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
+        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+        this.processingTimeService = getRuntimeContext().getProcessingTimeService();
+        processingTimeService.registerTimer(
+                processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this);
+
+        this.recordQueues = new HashMap<>();
+        this.schemaCache = new HashMap<>();
+        this.blacklist = new HashSet<>();
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        JsonNode jsonNode = dynamicSchemaFormat.deserialize(element.getValue().getBinary(0));
+
+        TableIdentifier tableId = parseId(jsonNode);
+        if (blacklist.contains(tableId)) {
+            return;
+        }
+
+        boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
+        if (isDDL) {
+            execDDL(jsonNode, tableId);
+        } else {
+            execDML(jsonNode, tableId);
+        }
+    }
+
+    @Override
+    public void onProcessingTime(long timestamp) {
+        LOG.info("Black list table: {} at time {}.", blacklist, timestamp);
+        processingTimeService.registerTimer(
+                processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this);
+    }
+
+    private void execDDL(JsonNode jsonNode, TableIdentifier tableId) {
+        // todo:parse ddl sql
+    }
+
+    private void execDML(JsonNode jsonNode, TableIdentifier tableId) {
+        RecordWithSchema record = parseRecord(jsonNode, tableId);
+        Schema schema = schemaCache.get(record.getTableId());
+        Schema dataSchema = record.getSchema();
+        recordQueues.compute(record.getTableId(), (k, v) -> {
+            if (v == null) {
+                v = new LinkedList<>();
+            }
+            v.add(record);
+            return v;
+        });
+
+        if (schema == null) {
+            handleTableCreateEventFromOperator(record.getTableId(), dataSchema);
+        } else {
+            handleSchemaInfoEvent(record.getTableId(), schema);
+        }
+    }
+
+    // ======================== All coordinator interact request and response method ============================
+    private void handleSchemaInfoEvent(TableIdentifier tableId, Schema schema) {
+        schemaCache.put(tableId, schema);
+        Schema latestSchema = schemaCache.get(tableId);
+        Queue<RecordWithSchema> queue = recordQueues.get(tableId);
+        while (queue != null && !queue.isEmpty()) {
+            Schema dataSchema = queue.peek().getSchema();
+            // if compatible, this means that the current schema is the latest schema
+            // if not, prove the need to update the current schema
+            if (isCompatible(latestSchema, dataSchema)) {
+                RecordWithSchema recordWithSchema = queue.poll()
+                        .refreshFieldId(latestSchema)
+                        .refreshRowData((jsonNode, schema1) -> {
+                            try {
+                                return dynamicSchemaFormat.extractRowData(jsonNode, FlinkSchemaUtil.convert(schema1));
+                            } catch (Exception e) {
+                                LOG.warn("Ignore table {} schema change, old: {} new: {}.",
+                                        tableId, dataSchema, latestSchema, e);
+                                blacklist.add(tableId);
+                            }
+                            return Collections.emptyList();
+                        });
+                output.collect(new StreamRecord<>(recordWithSchema));
+            } else {
+                handldAlterSchemaEventFromOperator(tableId, latestSchema, dataSchema);
+            }
+        }
+    }
+
+    // ================================ All coordinator handle method ==============================================
+    private void handleTableCreateEventFromOperator(TableIdentifier tableId, Schema schema) {
+        if (!catalog.tableExists(tableId)) {
+            if (asNamespaceCatalog != null && !asNamespaceCatalog.namespaceExists(tableId.namespace())) {
+                try {
+                    asNamespaceCatalog.createNamespace(tableId.namespace());
+                    LOG.info("Auto create Database({}) in Catalog({}).", tableId.namespace(), catalog.name());
+                } catch (AlreadyExistsException e) {
+                    LOG.warn("Database({}) already exist in Catalog({})!", tableId.namespace(), catalog.name());
+                }
+            }
+
+            ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+            properties.put("format-version", "2");
+            properties.put("write.upsert.enabled", "true");
+            // for hive visible
+            properties.put("engine.hive.enabled", "true");
+
+            try {
+                catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), properties.build());
+                LOG.info("Auto create Table({}) in Database({}) in Catalog({})!",
+                        tableId.name(), tableId.namespace(), catalog.name());
+            } catch (AlreadyExistsException e) {
+                LOG.warn("Table({}) already exist in Database({}) in Catalog({})!",
+                        tableId.name(), tableId.namespace(), catalog.name());
+            }
+        }
+
+        handleSchemaInfoEvent(tableId, catalog.loadTable(tableId).schema());
+    }
+
+    private void handldAlterSchemaEventFromOperator(TableIdentifier tableId, Schema oldSchema, Schema newSchema) {
+        Table table = catalog.loadTable(tableId);
+
+        // The transactionality of changes is guaranteed by comparing the old schema with the current schema of the
+        // table.
+        // Judging whether changes can be made by schema comparison (currently only column additions are supported),
+        // for scenarios that cannot be changed, it is always considered that there is a problem with the data.
+        Transaction transaction = table.newTransaction();
+        if (table.schema().sameSchema(oldSchema)) {
+            List<TableChange> tableChanges = SchemaChangeUtils.diffSchema(oldSchema, newSchema);
+            if (canHandleWithSchemaUpdate(tableId, tableChanges)) {
+                SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges);
+                LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges);
+            }
+        }
+        transaction.commitTransaction();
+        handleSchemaInfoEvent(tableId, table.schema());
+    }
+
+    // =============================== Utils method =================================================================
+    // The way to judge compatibility is whether all the field names in the old schema exist in the new schema
+    private boolean isCompatible(Schema newSchema, Schema oldSchema) {
+        for (NestedField field : oldSchema.columns()) {
+            if (newSchema.findField(field.name()) == null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private TableIdentifier parseId(JsonNode data) throws IOException {
+        String databaseStr = dynamicSchemaFormat.parse(data, multipleSinkOption.getDatabasePattern());
+        String tableStr = dynamicSchemaFormat.parse(data, multipleSinkOption.getTablePattern());
+        return TableIdentifier.of(databaseStr, tableStr);
+    }
+
+    private RecordWithSchema parseRecord(JsonNode data, TableIdentifier tableId) {
+        List<String> pkListStr = dynamicSchemaFormat.extractPrimaryKeyNames(data);
+        RowType schema = dynamicSchemaFormat.extractSchema(data, pkListStr);
+
+        RecordWithSchema record = new RecordWithSchema(
+                data,
+                FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(schema)),
+                tableId,
+                pkListStr);
+        return record;
+    }
+
+    private boolean canHandleWithSchemaUpdate(TableIdentifier tableId, List<TableChange> tableChanges) {
+        boolean canHandle = true;
+        for (TableChange tableChange : tableChanges) {
+            if (tableChange instanceof AddColumn) {
+                canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+                        multipleSinkOption.getAddColumnPolicy());
+            } else if (tableChange instanceof DeleteColumn) {
+                canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+                        multipleSinkOption.getDelColumnPolicy());
+            } else {
+                canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+                        LOG_WITH_IGNORE);
+            }
+        }
+
+        if (!canHandle) {
+            blacklist.add(tableId);
+        }
+        return canHandle;
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java
new file mode 100644
index 000000000..131daba7f
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java
@@ -0,0 +1,106 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.TableLoader;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class IcebergMultipleFilesCommiter extends IcebergProcessFunction<MultipleWriteResult, Void>
+        implements CheckpointedFunction, CheckpointListener, BoundedOneInput {
+
+    private Map<TableIdentifier, IcebergSingleFileCommiter> multipleCommiters;
+    private final CatalogLoader catalogLoader;
+    private final boolean overwrite;
+
+    public IcebergMultipleFilesCommiter(CatalogLoader catalogLoader, boolean overwrite) {
+        this.catalogLoader = catalogLoader;
+        this.overwrite = overwrite;
+    }
+
+    private transient FunctionInitializationContext functionInitializationContext;
+
+    @Override
+    public void processElement(MultipleWriteResult value) throws Exception {
+        TableIdentifier tableId = value.getTableId();
+        if (multipleCommiters.get(tableId) == null) {
+            IcebergSingleFileCommiter commiter = new IcebergSingleFileCommiter(
+                    tableId, TableLoader.fromCatalog(catalogLoader, value.getTableId()), overwrite);
+            commiter.setup(getRuntimeContext(), collector, context);
+            commiter.initializeState(functionInitializationContext);
+            commiter.open(new Configuration());
+            multipleCommiters.put(tableId, commiter);
+        }
+
+        multipleCommiters.get(tableId).processElement(value.getWriteResult());
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleFileCommiter> entry : multipleCommiters.entrySet()) {
+            entry.getValue().snapshotState(context);
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        this.functionInitializationContext = context;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleFileCommiter> entry : multipleCommiters.entrySet()) {
+            entry.getValue().notifyCheckpointComplete(checkpointId);
+        }
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        multipleCommiters = new HashMap<>();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (multipleCommiters == null) {
+            return;
+        }
+
+        for (Entry<TableIdentifier, IcebergSingleFileCommiter> entry : multipleCommiters.entrySet()) {
+            entry.getValue().close();
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleFileCommiter> entry : multipleCommiters.entrySet()) {
+            entry.getValue().endInput();
+        }
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
new file mode 100644
index 000000000..4c3fb0045
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -0,0 +1,207 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+/**
+ * Iceberg writer that can distinguish different sink tables and route and distribute data into different
+ * IcebergStreamWriter.
+ */
+public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWithSchema, MultipleWriteResult>
+        implements CheckpointedFunction, BoundedOneInput {
+    private static final Logger LOG = LoggerFactory.getLogger(IcebergMultipleStreamWriter.class);
+
+    private final boolean appendMode;
+    private final CatalogLoader catalogLoader;
+
+    private transient Catalog catalog;
+    private transient Map<TableIdentifier, IcebergSingleStreamWriter<RowData>> multipleWriters;
+    private transient Map<TableIdentifier, Table> multipleTables;
+    private transient Map<TableIdentifier, Schema> multipleSchemas;
+    private transient FunctionInitializationContext functionInitializationContext;
+
+    public IcebergMultipleStreamWriter(boolean appendMode, CatalogLoader catalogLoader) {
+        this.appendMode = appendMode;
+        this.catalogLoader = catalogLoader;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        this.catalog = catalogLoader.loadCatalog();
+        this.multipleWriters = new HashMap<>();
+        this.multipleTables = new HashMap<>();
+        this.multipleSchemas = new HashMap<>();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (catalog instanceof Closeable) {
+            ((Closeable) catalog).close();
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().endInput();
+        }
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().dispose();
+        }
+        multipleWriters.clear();
+        multipleTables.clear();
+        multipleSchemas.clear();
+    }
+
+    @Override
+    public void processElement(RecordWithSchema recordWithSchema) throws Exception {
+        TableIdentifier tableId = recordWithSchema.getTableId();
+
+        if (isSchemaUpdate(recordWithSchema)) {
+            if (multipleTables.get(tableId) == null) {
+                Table table = catalog.loadTable(recordWithSchema.getTableId());
+                multipleTables.put(tableId, table);
+            }
+
+            // refresh some runtime table properties
+            Table table = multipleTables.get(recordWithSchema.getTableId());
+            Map<String, String> tableProperties = table.properties();
+            boolean upsertMode = PropertyUtil.propertyAsBoolean(tableProperties,
+                    UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
+            long targetFileSizeBytes = PropertyUtil.propertyAsLong(tableProperties,
+                    WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+            String formatString = tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+            FileFormat fileFormat = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+            List<Integer> equalityFieldIds = recordWithSchema.getPrimaryKeys().stream()
+                    .map(pk -> recordWithSchema.getSchema().findField(pk).fieldId())
+                    .collect(Collectors.toList());
+            // if physical primary key not exist, put all field to logical primary key
+            if (equalityFieldIds.isEmpty()) {
+                equalityFieldIds = recordWithSchema.getSchema().columns().stream()
+                        .map(NestedField::fieldId)
+                        .collect(Collectors.toList());
+            }
+
+            TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
+                    table,
+                    recordWithSchema.getSchema(),
+                    FlinkSchemaUtil.convert(recordWithSchema.getSchema()),
+                    targetFileSizeBytes,
+                    fileFormat,
+                    equalityFieldIds,
+                    upsertMode,
+                    appendMode);
+
+            if (multipleWriters.get(tableId) == null) {
+                IcebergSingleStreamWriter<RowData> writer = new IcebergSingleStreamWriter<>(
+                        tableId.toString(), taskWriterFactory, null, null);
+                writer.setup(getRuntimeContext(),
+                        new CallbackCollector<>(writeResult ->
+                                collector.collect(new MultipleWriteResult(tableId, writeResult))),
+                        context);
+                writer.initializeState(functionInitializationContext);
+                writer.open(new Configuration());
+                multipleWriters.put(tableId, writer);
+            } else {  // only if second times schema will evolute
+                // Refresh new schema maybe cause previous file writer interrupted, so here should handle it
+                multipleWriters.get(tableId).schemaEvolution(taskWriterFactory);
+            }
+
+        }
+
+        if (multipleWriters.get(tableId) != null) {
+            for (RowData data : recordWithSchema.getData()) {
+                multipleWriters.get(tableId).processElement(data);
+            }
+        } else {
+            LOG.error("Unregistered table schema for {}.", recordWithSchema.getTableId());
+        }
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().prepareSnapshotPreBarrier(checkpointId);
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) {
+            entry.getValue().snapshotState(context);
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        this.functionInitializationContext = context;
+    }
+
+    private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) {
+        TableIdentifier tableId = recordWithSchema.getTableId();
+        recordWithSchema.replaceSchema();
+        if (multipleSchemas.get(tableId) != null
+                && multipleSchemas.get(tableId).sameSchema(recordWithSchema.getSchema())) {
+            return false;
+        }
+        LOG.info("Schema evolution with table {}, old schema: {}, new Schema: {}",
+                tableId, multipleSchemas.get(tableId), recordWithSchema.getSchema());
+        multipleSchemas.put(recordWithSchema.getTableId(), recordWithSchema.getSchema());
+        return true;
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergProcessFunction.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergProcessFunction.java
new file mode 100644
index 000000000..6c7034488
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergProcessFunction.java
@@ -0,0 +1,107 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+public abstract class IcebergProcessFunction<IN, OUT> extends AbstractRichFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    protected transient Collector<OUT> collector;
+
+    protected transient Context context;
+
+    public abstract void processElement(IN value) throws Exception;
+
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+
+    }
+
+    public void endInput() throws Exception {
+
+    }
+
+    public void dispose() throws Exception {
+
+    }
+
+    public void setup(RuntimeContext t, Collector<OUT> collector, Context context) {
+        setRuntimeContext(t);
+        setCollector(collector);
+        setContext(context);
+    }
+
+    public void setCollector(Collector<OUT> collector) {
+        this.collector = collector;
+    }
+
+    public void setContext(Context context) {
+        this.context = context;
+    }
+
+    public abstract static class Context {
+
+        /**
+         * Timestamp of the element currently being processed or timestamp of a firing timer.
+         *
+         * <p>This might be {@code null}, for example if the time characteristic of your program is
+         * set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+         */
+        public abstract Long timestamp();
+
+        /** A {@link TimerService} for querying time and registering timers. */
+        public abstract TimerService timerService();
+
+        /**
+         * Emits a record to the side output identified by the {@link OutputTag}.
+         *
+         * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
+         * @param value The record to emit.
+         */
+        public abstract <X> void output(OutputTag<X> outputTag, X value);
+    }
+
+    public static class CallbackCollector<T> implements Collector<T> {
+        final ThrowingConsumer<T, Exception> callback;
+
+        CallbackCollector(ThrowingConsumer<T, Exception> callback) {
+            this.callback = callback;
+        }
+
+        @Override
+        public void collect(T t) {
+            try {
+                callback.accept(t);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergProcessOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergProcessOperator.java
new file mode 100644
index 000000000..64c36e1b5
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergProcessOperator.java
@@ -0,0 +1,144 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.OutputTag;
+import org.apache.inlong.sort.iceberg.sink.multiple.IcebergProcessFunction.Context;
+
+public class IcebergProcessOperator<IN, OUT>
+        extends AbstractUdfStreamOperator<OUT, IcebergProcessFunction<IN, OUT>>
+        implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
+    private static final long serialVersionUID = -1837485654246776219L;
+
+    private long currentWatermark = Long.MIN_VALUE;
+
+    private transient TimestampedCollector<OUT> collector;
+
+    private transient ContextImpl context;
+
+    public IcebergProcessOperator(IcebergProcessFunction<IN, OUT> userFunction) {
+        super(userFunction);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        collector = new TimestampedCollector<>(output);
+        context = new ContextImpl(getProcessingTimeService());
+        userFunction.setCollector(collector);
+        userFunction.setContext(new ContextImpl(getProcessingTimeService()));
+    }
+
+    @Override
+    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
+        collector.setTimestamp(streamRecord);
+        context.element = streamRecord;
+        userFunction.processElement(streamRecord.getValue());
+        context.element = null;
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        super.processWatermark(mark);
+        this.currentWatermark = mark.getTimestamp();
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        super.prepareSnapshotPreBarrier(checkpointId);
+        userFunction.prepareSnapshotPreBarrier(checkpointId);
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        userFunction.endInput();
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        userFunction.dispose();
+    }
+
+    private class ContextImpl extends Context implements TimerService {
+        private StreamRecord<IN> element;
+
+        private final ProcessingTimeService processingTimeService;
+
+        ContextImpl(ProcessingTimeService processingTimeService) {
+            this.processingTimeService = processingTimeService;
+        }
+
+        @Override
+        public Long timestamp() {
+            return element == null ? null : element.getTimestamp();
+        }
+
+        @Override
+        public TimerService timerService() {
+            return this;
+        }
+
+        @Override
+        public <X> void output(OutputTag<X> outputTag, X value) {
+            if (outputTag == null) {
+                throw new IllegalArgumentException("OutputTag must not be null.");
+            }
+            output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
+        }
+
+        @Override
+        public long currentProcessingTime() {
+            return processingTimeService.getCurrentProcessingTime();
+        }
+
+        @Override
+        public long currentWatermark() {
+            return currentWatermark;
+        }
+
+        @Override
+        public void registerProcessingTimeTimer(long time) {
+            throw new UnsupportedOperationException(UNSUPPORTED_REGISTER_TIMER_MSG);
+        }
+
+        @Override
+        public void registerEventTimeTimer(long time) {
+            throw new UnsupportedOperationException(UNSUPPORTED_REGISTER_TIMER_MSG);
+        }
+
+        @Override
+        public void deleteProcessingTimeTimer(long time) {
+            throw new UnsupportedOperationException(UNSUPPORTED_DELETE_TIMER_MSG);
+        }
+
+        @Override
+        public void deleteEventTimeTimer(long time) {
+            throw new UnsupportedOperationException(UNSUPPORTED_DELETE_TIMER_MSG);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
similarity index 83%
rename from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
rename to inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
index 7942ef77c..bd28a22a0 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
@@ -1,392 +1,393 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.sort.iceberg.sink;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.core.io.SimpleVersionedSerialization;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
-import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.ReplacePartitions;
-import org.apache.iceberg.RowDelta;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotUpdate;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.io.WriteResult;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Strings;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.PropertyUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.SortedMap;
-
-/**
- * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
- */
-class IcebergFilesCommitter extends AbstractStreamOperator<Void>
-        implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {
-
-    private static final long serialVersionUID = 1L;
-    private static final long INITIAL_CHECKPOINT_ID = -1L;
-    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
-
-    private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
-    private static final String FLINK_JOB_ID = "flink.job-id";
-
-    // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we
-    // could correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg
-    // table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
-    // committing the iceberg transaction.
-    private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
-    static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
-
-    // TableLoader to load iceberg table lazily.
-    private final TableLoader tableLoader;
-    private final boolean replacePartitions;
-
-    // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed
-    // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for
-    // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files
-    // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
-    // any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
-    // iceberg table when the next checkpoint happen.
-    private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
-
-    // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
-    // 'dataFilesPerCheckpoint'.
-    private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
-
-    // It will have an unique identifier for one job.
-    private transient String flinkJobId;
-    private transient Table table;
-    private transient ManifestOutputFileFactory manifestOutputFileFactory;
-    private transient long maxCommittedCheckpointId;
-    private transient int continuousEmptyCheckpoints;
-    private transient int maxContinuousEmptyCommits;
-    // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by
-    // the same flink job; another case is restoring from snapshot created by another different job. For the second
-    // case, we need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id
-    // when traversing iceberg table's snapshots.
-    private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor<>(
-            "iceberg-flink-job-id", BasicTypeInfo.STRING_TYPE_INFO);
-    private transient ListState<String> jobIdState;
-    // All pending checkpoints states for this function.
-    private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor();
-    private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
-
-    IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) {
-        this.tableLoader = tableLoader;
-        this.replacePartitions = replacePartitions;
-    }
-
-    @Override
-    public void initializeState(StateInitializationContext context) throws Exception {
-        super.initializeState(context);
-        this.flinkJobId = getContainingTask().getEnvironment().getJobID().toString();
-
-        // Open the table loader and load the table.
-        this.tableLoader.open();
-        this.table = tableLoader.loadTable();
-
-        maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
-        Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
-                MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
-
-        int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
-        int attemptId = getRuntimeContext().getAttemptNumber();
-        this.manifestOutputFileFactory = FlinkManifestUtil
-                .createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
-        this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
-
-        this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
-        this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
-        if (context.isRestored()) {
-            String restoredFlinkJobId = jobIdState.get().iterator().next();
-            Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
-                    "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
-
-            // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job even
-            // if it's restored from a snapshot created by another different flink job, so it's safe to assign the max
-            // committed checkpoint id from restored flink job to the current flink job.
-            this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
-
-            NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
-                    .newTreeMap(checkpointsState.get().iterator().next())
-                    .tailMap(maxCommittedCheckpointId, false);
-            if (!uncommittedDataFiles.isEmpty()) {
-                // Committed all uncommitted data files from the old flink job to iceberg table.
-                long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
-                commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
-            }
-        }
-    }
-
-    @Override
-    public void snapshotState(StateSnapshotContext context) throws Exception {
-        super.snapshotState(context);
-        long checkpointId = context.getCheckpointId();
-        LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
-
-        // Update the checkpoint state.
-        dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
-        // Reset the snapshot state to the latest state.
-        checkpointsState.clear();
-        checkpointsState.add(dataFilesPerCheckpoint);
-
-        jobIdState.clear();
-        jobIdState.add(flinkJobId);
-
-        // Clear the local buffer for current checkpoint.
-        writeResultsOfCurrentCkpt.clear();
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        super.notifyCheckpointComplete(checkpointId);
-        // It's possible that we have the following events:
-        //   1. snapshotState(ckpId);
-        //   2. snapshotState(ckpId+1);
-        //   3. notifyCheckpointComplete(ckpId+1);
-        //   4. notifyCheckpointComplete(ckpId);
-        // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
-        // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
-        if (checkpointId > maxCommittedCheckpointId) {
-            commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
-            this.maxCommittedCheckpointId = checkpointId;
-        }
-    }
-
-    private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
-            String newFlinkJobId,
-            long checkpointId) throws IOException {
-        NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
-        List<ManifestFile> manifests = Lists.newArrayList();
-        NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
-        for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
-            if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
-                // Skip the empty flink manifest.
-                continue;
-            }
-
-            DeltaManifests deltaManifests = SimpleVersionedSerialization
-                    .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());
-            pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
-            manifests.addAll(deltaManifests.manifests());
-        }
-
-        int totalFiles = pendingResults.values().stream()
-                .mapToInt(r -> r.dataFiles().length + r.deleteFiles().length).sum();
-        continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
-        if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
-            if (replacePartitions) {
-                replacePartitions(pendingResults, newFlinkJobId, checkpointId);
-            } else {
-                commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
-            }
-            continuousEmptyCheckpoints = 0;
-        }
-        pendingMap.clear();
-
-        // Delete the committed manifests.
-        for (ManifestFile manifest : manifests) {
-            try {
-                table.io().deleteFile(manifest.path());
-            } catch (Exception e) {
-                // The flink manifests cleaning failure shouldn't abort the completed checkpoint.
-                String details = MoreObjects.toStringHelper(this)
-                        .add("flinkJobId", newFlinkJobId)
-                        .add("checkpointId", checkpointId)
-                        .add("manifestPath", manifest.path())
-                        .toString();
-                LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink "
-                                + "manifests: {}", details, e);
-            }
-        }
-    }
-
-    private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId,
-            long checkpointId) {
-        // Partition overwrite does not support delete files.
-        int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
-        Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files.");
-
-        // Commit the overwrite transaction.
-        ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
-
-        int numFiles = 0;
-        for (WriteResult result : pendingResults.values()) {
-            Preconditions.checkState(result.referencedDataFiles().length == 0,
-                    "Should have no referenced data files.");
-
-            numFiles += result.dataFiles().length;
-            Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
-        }
-
-        commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
-    }
-
-    private void commitDeltaTxn(
-            NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
-        int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
-
-        if (deleteFilesNum == 0) {
-            // To be compatible with iceberg format V1.
-            AppendFiles appendFiles = table.newAppend();
-
-            int numFiles = 0;
-            for (WriteResult result : pendingResults.values()) {
-                Preconditions.checkState(result.referencedDataFiles().length == 0,
-                        "Should have no referenced data files.");
-
-                numFiles += result.dataFiles().length;
-                Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
-            }
-
-            commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
-        } else {
-            // To be compatible with iceberg format V2.
-            for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
-                // We don't commit the merged result into a single transaction because for the sequential transaction
-                // txn1 and txn2, the equality-delete files of txn2 are required to be applied to data files from txn1.
-                // Committing the merged one will lead to the incorrect delete semantic.
-                WriteResult result = e.getValue();
-
-                // Row delta validations are not needed for streaming changes that write equality deletes. Equality
-                // deletes are applied to data in all previous sequence numbers, so retries may push deletes further in
-                // the future, but do not affect correctness. Position deletes committed to the table in this path are
-                // used only to delete rows from data files that are being added in this commit. There is no way for
-                // data files added along with the delete files to be concurrently removed, so there is no need to
-                // validate the files referenced by the position delete files that are being committed.
-                RowDelta rowDelta = table.newRowDelta();
-
-                int numDataFiles = result.dataFiles().length;
-                Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
-
-                int numDeleteFiles = result.deleteFiles().length;
-                Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
-
-                commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, e.getKey());
-            }
-        }
-    }
-
-    private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int numDeleteFiles, String description,
-            String newFlinkJobId, long checkpointId) {
-        LOG.info("Committing {} with {} data files and {} delete files to table {}", description, numDataFiles,
-                numDeleteFiles, table);
-        operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
-        operation.set(FLINK_JOB_ID, newFlinkJobId);
-
-        long start = System.currentTimeMillis();
-        operation.commit(); // abort is automatically called if this fails.
-        long duration = System.currentTimeMillis() - start;
-        LOG.info("Committed in {} ms", duration);
-    }
-
-    @Override
-    public void processElement(StreamRecord<WriteResult> element) {
-        this.writeResultsOfCurrentCkpt.add(element.getValue());
-    }
-
-    @Override
-    public void endInput() throws IOException {
-        // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
-        long currentCheckpointId = Long.MAX_VALUE;
-        dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
-        writeResultsOfCurrentCkpt.clear();
-
-        commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId);
-    }
-
-    /**
-     * Write all the complete data files to a newly created manifest file and return the manifest's avro serialized
-     * bytes.
-     */
-    private byte[] writeToManifest(long checkpointId) throws IOException {
-        if (writeResultsOfCurrentCkpt.isEmpty()) {
-            return EMPTY_MANIFEST_DATA;
-        }
-
-        WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
-        DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result,
-                () -> manifestOutputFileFactory.create(checkpointId), table.spec());
-
-        return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests);
-    }
-
-    @Override
-    public void dispose() throws Exception {
-        if (tableLoader != null) {
-            tableLoader.close();
-        }
-    }
-
-    private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor() {
-        Comparator<Long> longComparator = Comparators.forType(Types.LongType.get());
-        // Construct a SortedMapTypeInfo.
-        SortedMapTypeInfo<Long, byte[]> sortedMapTypeInfo = new SortedMapTypeInfo<>(
-                BasicTypeInfo.LONG_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, longComparator
-        );
-        return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
-    }
-
-    static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
-        Snapshot snapshot = table.currentSnapshot();
-        long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
-
-        while (snapshot != null) {
-            Map<String, String> summary = snapshot.summary();
-            String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
-            if (flinkJobId.equals(snapshotFlinkJobId)) {
-                String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
-                if (value != null) {
-                    lastCommittedCheckpointId = Long.parseLong(value);
-                    break;
-                }
-            }
-            Long parentSnapshotId = snapshot.parentId();
-            snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null;
-        }
-
-        return lastCommittedCheckpointId;
-    }
-}
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifests;
+import org.apache.inlong.sort.iceberg.sink.DeltaManifestsSerializer;
+import org.apache.inlong.sort.iceberg.sink.FlinkManifestUtil;
+import org.apache.inlong.sort.iceberg.sink.ManifestOutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+
+public class IcebergSingleFileCommiter extends IcebergProcessFunction<WriteResult, Void>
+        implements CheckpointedFunction, CheckpointListener {
+    private static final long serialVersionUID = 1L;
+    private static final long INITIAL_CHECKPOINT_ID = -1L;
+    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
+
+    private static final Logger LOG = LoggerFactory.getLogger(IcebergSingleFileCommiter.class);
+    private static final String FLINK_JOB_ID = "flink.job-id";
+
+    // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we
+    // could correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg
+    // table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
+    // committing the iceberg transaction.
+    private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
+    static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
+
+    // TableLoader to load iceberg table lazily.
+    private final TableLoader tableLoader;
+    private final boolean replacePartitions;
+
+    // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed
+    // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for
+    // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the 2st checkpoint have 1 data files
+    // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of network/disk failure etc, while we don't expect
+    // any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
+    // iceberg table when the next checkpoint happen.
+    private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
+
+    // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
+    // 'dataFilesPerCheckpoint'.
+    private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
+
+    // It will have an unique identifier for one job.
+    private transient String flinkJobId;
+    private transient Table table;
+    private transient ManifestOutputFileFactory manifestOutputFileFactory;
+    private transient long maxCommittedCheckpointId;
+    private transient int continuousEmptyCheckpoints;
+    private transient int maxContinuousEmptyCommits;
+    // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by
+    // the same flink job; another case is restoring from snapshot created by another different job. For the second
+    // case, we need to maintain the old flink job's id in flink state backend to find the max-committed-checkpoint-id
+    // when traversing iceberg table's snapshots.
+    private final ListStateDescriptor<String> jobIdDescriptor;
+    private transient ListState<String> jobIdState;
+    // All pending checkpoints states for this function.
+    private final ListStateDescriptor<SortedMap<Long, byte[]>> stateDescriptor;
+    private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
+
+    public IcebergSingleFileCommiter(TableIdentifier tableId, TableLoader tableLoader, boolean replacePartitions) {
+        // Here must distinguish state descriptor with tableId, because all icebergSingleFileCommiter state in
+        // one IcebergMultipleFilesCommiter use same StateStore.
+        this.tableLoader = tableLoader;
+        this.replacePartitions = replacePartitions;
+        this.jobIdDescriptor = new ListStateDescriptor<>(
+                String.format("iceberg(%s)-flink-job-id", tableId.toString()), BasicTypeInfo.STRING_TYPE_INFO);
+        this.stateDescriptor = buildStateDescriptor(tableId);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        this.flinkJobId = getRuntimeContext().getJobId().toString();
+
+        // Open the table loader and load the table.
+        this.tableLoader.open();
+        this.table = tableLoader.loadTable();
+
+        maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
+        Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
+                MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
+
+        int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+        int attemptId = getRuntimeContext().getAttemptNumber();
+        this.manifestOutputFileFactory = FlinkManifestUtil
+                .createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
+        this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+        this.checkpointsState = context.getOperatorStateStore().getListState(stateDescriptor);
+        this.jobIdState = context.getOperatorStateStore().getListState(jobIdDescriptor);
+        if (context.isRestored()) {
+            String restoredFlinkJobId = jobIdState.get().iterator().next();
+            Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
+                    "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
+
+            // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job even
+            // if it's restored from a snapshot created by another different flink job, so it's safe to assign the max
+            // committed checkpoint id from restored flink job to the current flink job.
+            this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
+
+            NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
+                    .newTreeMap(checkpointsState.get().iterator().next())
+                    .tailMap(maxCommittedCheckpointId, false);
+            if (!uncommittedDataFiles.isEmpty()) {
+                // Committed all uncommitted data files from the old flink job to iceberg table.
+                long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
+                commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
+            }
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        long checkpointId = context.getCheckpointId();
+        LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", table, checkpointId);
+
+        // Update the checkpoint state.
+        dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));
+        // Reset the snapshot state to the latest state.
+        checkpointsState.clear();
+        checkpointsState.add(dataFilesPerCheckpoint);
+
+        jobIdState.clear();
+        jobIdState.add(flinkJobId);
+
+        // Clear the local buffer for current checkpoint.
+        writeResultsOfCurrentCkpt.clear();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // It's possible that we have the following events:
+        //   1. snapshotState(ckpId);
+        //   2. snapshotState(ckpId+1);
+        //   3. notifyCheckpointComplete(ckpId+1);
+        //   4. notifyCheckpointComplete(ckpId);
+        // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
+        // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
+        if (checkpointId > maxCommittedCheckpointId) {
+            commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
+            this.maxCommittedCheckpointId = checkpointId;
+        }
+    }
+
+    private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap,
+            String newFlinkJobId,
+            long checkpointId) throws IOException {
+        NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
+        List<ManifestFile> manifests = Lists.newArrayList();
+        NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
+        for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
+            if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
+                // Skip the empty flink manifest.
+                continue;
+            }
+
+            DeltaManifests deltaManifests = SimpleVersionedSerialization
+                    .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue());
+            pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+            manifests.addAll(deltaManifests.manifests());
+        }
+
+        int totalFiles = pendingResults.values().stream()
+                .mapToInt(r -> r.dataFiles().length + r.deleteFiles().length).sum();
+        continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
+        if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
+            if (replacePartitions) {
+                replacePartitions(pendingResults, newFlinkJobId, checkpointId);
+            } else {
+                commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
+            }
+            continuousEmptyCheckpoints = 0;
+        }
+        pendingMap.clear();
+
+        // Delete the committed manifests.
+        for (ManifestFile manifest : manifests) {
+            try {
+                table.io().deleteFile(manifest.path());
+            } catch (Exception e) {
+                // The flink manifests cleaning failure shouldn't abort the completed checkpoint.
+                String details = MoreObjects.toStringHelper(this)
+                        .add("flinkJobId", newFlinkJobId)
+                        .add("checkpointId", checkpointId)
+                        .add("manifestPath", manifest.path())
+                        .toString();
+                LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink "
+                        + "manifests: {}", details, e);
+            }
+        }
+    }
+
+    private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId,
+            long checkpointId) {
+        // Partition overwrite does not support delete files.
+        int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
+        Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files.");
+
+        // Commit the overwrite transaction.
+        ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
+
+        int numFiles = 0;
+        for (WriteResult result : pendingResults.values()) {
+            Preconditions.checkState(result.referencedDataFiles().length == 0,
+                    "Should have no referenced data files.");
+
+            numFiles += result.dataFiles().length;
+            Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
+        }
+
+        commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
+    }
+
+    private void commitDeltaTxn(
+            NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
+        int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
+
+        if (deleteFilesNum == 0) {
+            // To be compatible with iceberg format V1.
+            AppendFiles appendFiles = table.newAppend();
+
+            int numFiles = 0;
+            for (WriteResult result : pendingResults.values()) {
+                Preconditions.checkState(result.referencedDataFiles().length == 0,
+                        "Should have no referenced data files.");
+
+                numFiles += result.dataFiles().length;
+                Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
+            }
+
+            commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
+        } else {
+            // To be compatible with iceberg format V2.
+            for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
+                // We don't commit the merged result into a single transaction because for the sequential transaction
+                // txn1 and txn2, the equality-delete files of txn2 are required to be applied to data files from txn1.
+                // Committing the merged one will lead to the incorrect delete semantic.
+                WriteResult result = e.getValue();
+
+                // Row delta validations are not needed for streaming changes that write equality deletes. Equality
+                // deletes are applied to data in all previous sequence numbers, so retries may push deletes further in
+                // the future, but do not affect correctness. Position deletes committed to the table in this path are
+                // used only to delete rows from data files that are being added in this commit. There is no way for
+                // data files added along with the delete files to be concurrently removed, so there is no need to
+                // validate the files referenced by the position delete files that are being committed.
+                RowDelta rowDelta = table.newRowDelta();
+
+                int numDataFiles = result.dataFiles().length;
+                Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
+
+                int numDeleteFiles = result.deleteFiles().length;
+                Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
+
+                commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, e.getKey());
+            }
+        }
+    }
+
+    private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int numDeleteFiles, String description,
+            String newFlinkJobId, long checkpointId) {
+        LOG.info("Committing {} with {} data files and {} delete files to table {}", description, numDataFiles,
+                numDeleteFiles, table);
+        operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
+        operation.set(FLINK_JOB_ID, newFlinkJobId);
+
+        long start = System.currentTimeMillis();
+        operation.commit(); // abort is automatically called if this fails.
+        long duration = System.currentTimeMillis() - start;
+        LOG.info("Committed in {} ms", duration);
+    }
+
+    @Override
+    public void processElement(WriteResult value)
+            throws Exception {
+        this.writeResultsOfCurrentCkpt.add(value);
+    }
+
+    @Override
+    public void endInput() throws IOException {
+        // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
+        long currentCheckpointId = Long.MAX_VALUE;
+        dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
+        writeResultsOfCurrentCkpt.clear();
+
+        commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId);
+    }
+
+    /**
+     * Write all the complete data files to a newly created manifest file and return the manifest's avro serialized
+     * bytes.
+     */
+    private byte[] writeToManifest(long checkpointId) throws IOException {
+        if (writeResultsOfCurrentCkpt.isEmpty()) {
+            return EMPTY_MANIFEST_DATA;
+        }
+
+        WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
+        DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result,
+                () -> manifestOutputFileFactory.create(checkpointId), table.spec());
+
+        return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests);
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        if (tableLoader != null) {
+            tableLoader.close();
+        }
+    }
+
+    private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor(TableIdentifier tableId) {
+        Comparator<Long> longComparator = Comparators.forType(Types.LongType.get());
+        // Construct a SortedMapTypeInfo.
+        SortedMapTypeInfo<Long, byte[]> sortedMapTypeInfo = new SortedMapTypeInfo<>(
+                BasicTypeInfo.LONG_TYPE_INFO, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, longComparator
+        );
+        return new ListStateDescriptor<>(
+                String.format("iceberg(%s)-files-committer-state", tableId.toString()), sortedMapTypeInfo);
+    }
+
+    static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
+        Snapshot snapshot = table.currentSnapshot();
+        long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+        while (snapshot != null) {
+            Map<String, String> summary = snapshot.summary();
+            String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
+            if (flinkJobId.equals(snapshotFlinkJobId)) {
+                String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
+                if (value != null) {
+                    lastCommittedCheckpointId = Long.parseLong(value);
+                    break;
+                }
+            }
+            Long parentSnapshotId = snapshot.parentId();
+            snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null;
+        }
+
+        return lastCommittedCheckpointId;
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
similarity index 68%
rename from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
rename to inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index 6f1b75ea0..1fd8586b3 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -1,177 +1,176 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.sort.iceberg.sink;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.iceberg.flink.sink.TaskWriterFactory;
-import org.apache.iceberg.io.TaskWriter;
-import org.apache.iceberg.io.WriteResult;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
-import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
-import org.apache.inlong.sort.base.util.MetricStateUtils;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
-import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
-import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
-
-/**
- * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
- */
-class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
-        implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {
-
-    private static final long serialVersionUID = 1L;
-
-    private final String fullTableName;
-    private final TaskWriterFactory<T> taskWriterFactory;
-    private final String inlongMetric;
-    private final String auditHostAndPorts;
-
-    private transient TaskWriter<T> writer;
-    private transient int subTaskId;
-    private transient int attemptId;
-    @Nullable
-    private transient SinkMetricData metricData;
-    private transient ListState<MetricState> metricStateListState;
-    private transient MetricState metricState;
-
-    IcebergStreamWriter(
-            String fullTableName,
-            TaskWriterFactory<T> taskWriterFactory,
-            String inlongMetric,
-            String auditHostAndPorts) {
-        this.fullTableName = fullTableName;
-        this.taskWriterFactory = taskWriterFactory;
-        this.inlongMetric = inlongMetric;
-        this.auditHostAndPorts = auditHostAndPorts;
-        setChainingStrategy(ChainingStrategy.ALWAYS);
-    }
-
-    @Override
-    public void open() {
-        this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
-        this.attemptId = getRuntimeContext().getAttemptNumber();
-
-        // Initialize the task writer factory.
-        this.taskWriterFactory.initialize(subTaskId, attemptId);
-
-        // Initialize the task writer.
-        this.writer = taskWriterFactory.create();
-
-        // Initialize metric
-        MetricOption metricOption = MetricOption.builder()
-                .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
-                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
-                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
-                .withRegisterMetric(RegisteredMetric.ALL)
-                .build();
-        if (metricOption != null) {
-            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
-        }
-    }
-
-    @Override
-    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
-        // close all open files and emit files to downstream committer operator
-        emit(writer.complete());
-
-        this.writer = taskWriterFactory.create();
-    }
-
-    @Override
-    public void processElement(StreamRecord<T> element) throws Exception {
-        writer.write(element.getValue());
-
-        if (metricData != null) {
-            metricData.invokeWithEstimate(element.getValue());
-        }
-    }
-
-    @Override
-    public void initializeState(StateInitializationContext context) throws Exception {
-        super.initializeState(context);
-        // init metric state
-        if (this.inlongMetric != null) {
-            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
-                    new ListStateDescriptor<>(
-                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
-                    })));
-        }
-        if (context.isRestored()) {
-            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
-                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
-        }
-    }
-
-    @Override
-    public void snapshotState(StateSnapshotContext context) throws Exception {
-        super.snapshotState(context);
-        if (metricData != null && metricStateListState != null) {
-            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
-                    getRuntimeContext().getIndexOfThisSubtask());
-        }
-    }
-
-    @Override
-    public void dispose() throws Exception {
-        super.dispose();
-        if (writer != null) {
-            writer.close();
-            writer = null;
-        }
-    }
-
-    @Override
-    public void endInput() throws IOException {
-        // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining
-        // completed files to downstream before closing the writer so that we won't miss any of them.
-        emit(writer.complete());
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("table_name", fullTableName)
-                .add("subtask_id", subTaskId)
-                .add("attempt_id", attemptId)
-                .toString();
-    }
-
-    private void emit(WriteResult result) {
-        output.collect(new StreamRecord<>(result));
-    }
-}
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
+public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, WriteResult>
+        implements CheckpointedFunction, SchemaEvolutionFunction<TaskWriterFactory<T>> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String fullTableName;
+    private final String inlongMetric;
+    private final String auditHostAndPorts;
+    private TaskWriterFactory<T> taskWriterFactory;
+
+    private transient TaskWriter<T> writer;
+    private transient int subTaskId;
+    private transient int attemptId;
+    @Nullable
+    private transient SinkMetricData metricData;
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+
+    public IcebergSingleStreamWriter(
+            String fullTableName,
+            TaskWriterFactory<T> taskWriterFactory,
+            String inlongMetric,
+            String auditHostAndPorts) {
+        this.fullTableName = fullTableName;
+        this.taskWriterFactory = taskWriterFactory;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+        this.attemptId = getRuntimeContext().getAttemptNumber();
+
+        // Initialize the task writer factory.
+        this.taskWriterFactory.initialize(subTaskId, attemptId);
+
+        // Initialize the task writer.
+        this.writer = taskWriterFactory.create();
+
+        // Initialize metric
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withRegisterMetric(RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+        }
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        // close all open files and emit files to downstream committer operator
+        emit(writer.complete());
+
+        this.writer = taskWriterFactory.create();
+    }
+
+    @Override
+    public void processElement(T value)
+            throws Exception {
+        writer.write(value);
+
+        if (metricData != null) {
+            metricData.invokeWithEstimate(value);
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        // init metric state
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        if (metricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        if (writer != null) {
+            writer.close();
+            writer = null;
+        }
+    }
+
+    @Override
+    public void endInput() throws IOException {
+        // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the remaining
+        // completed files to downstream before closing the writer so that we won't miss any of them.
+        emit(writer.complete());
+    }
+
+    @Override
+    public void schemaEvolution(TaskWriterFactory<T> schema) throws IOException {
+        emit(writer.complete());
+
+        taskWriterFactory = schema;
+        taskWriterFactory.initialize(subTaskId, attemptId);
+        writer = taskWriterFactory.create();
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("table_name", fullTableName)
+                .add("subtask_id", subTaskId)
+                .add("attempt_id", attemptId)
+                .toString();
+    }
+
+    private void emit(WriteResult result) {
+        collector.collect(result);
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/MultipleWriteResult.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/MultipleWriteResult.java
new file mode 100644
index 000000000..4b004a140
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/MultipleWriteResult.java
@@ -0,0 +1,41 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.WriteResult;
+
+public class MultipleWriteResult {
+    private TableIdentifier tableId;
+
+    private WriteResult writeResult;
+
+    public MultipleWriteResult(TableIdentifier tableId, WriteResult writeResult) {
+        this.tableId = tableId;
+        this.writeResult = writeResult;
+    }
+
+    public TableIdentifier getTableId() {
+        return tableId;
+    }
+
+    public WriteResult getWriteResult() {
+        return writeResult;
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
new file mode 100644
index 000000000..bc08a4e40
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
@@ -0,0 +1,171 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.MapType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.types.Types.UUIDType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+public class RecordWithSchema {
+
+    public RecordWithSchema(
+            JsonNode originalData,
+            Schema schema,
+            TableIdentifier tableId,
+            List<String> primaryKeys) {
+        this.originalData = originalData;
+        this.schema = schema;
+        this.tableId = tableId;
+        this.primaryKeys = primaryKeys;
+    }
+
+    private transient JsonNode originalData;
+
+    private List<RowData> data;
+
+    private Schema schema;
+
+    private final TableIdentifier tableId;
+
+    private final List<String> primaryKeys;
+
+    public List<RowData> getData() {
+        return data;
+    }
+
+    public Schema getSchema() {
+        return schema;
+    }
+
+    public TableIdentifier getTableId() {
+        return tableId;
+    }
+
+    public List<String> getPrimaryKeys() {
+        return primaryKeys;
+    }
+
+    public RecordWithSchema refreshFieldId(Schema newSchema) {
+        // Solve the problem that there is no fieldId on the schema parsed from the data
+        // So here refresh catalog loaded schema
+        schema = newSchema.select(schema.columns().stream().map(NestedField::name).collect(Collectors.toList()));
+        return this;
+    }
+
+    public RecordWithSchema refreshRowData(BiFunction<JsonNode, Schema, List<RowData>> rowDataExtractor) {
+        // Solve the problem of type error during downstream parsing. Here, rowData is set to be compatible with rowType
+        data = rowDataExtractor.apply(originalData, schema);
+        return this;
+    }
+
+    // todo: here RecordWithSchema is deserialized from network, it's `Schema` is the new object, the `Type` is the
+    //       same.However `Type` do not implement equals method, so some method will return unexpected result when
+    //       compare this schema with Table#schema loaded from catalog, For example, Schema#sameSchema will return false
+    //       even thought schema is the same, can't get the comparators of Type even thought type is the same.
+    public void replaceSchema() {
+        List<NestedField> columns = schema.columns();
+        List<NestedField> newColumns = new ArrayList<>();
+        for (int i = 0; i < columns.size(); i++) {
+            newColumns.add(replaceField(columns.get(i)));
+        }
+        schema = new Schema(schema.schemaId(), newColumns, schema.getAliases(), schema.identifierFieldIds());
+    }
+
+    private static NestedField replaceField(NestedField nestedField) {
+        return NestedField.of(
+                nestedField.fieldId(),
+                nestedField.isOptional(),
+                nestedField.name(),
+                replaceType(nestedField.type()),
+                nestedField.doc());
+    }
+
+    private static Type replaceType(Type type) {
+        switch (type.typeId()) {
+            case BOOLEAN:
+                return BooleanType.get();
+            case INTEGER:
+                return IntegerType.get();
+            case LONG:
+                return LongType.get();
+            case FLOAT:
+                return FloatType.get();
+            case DOUBLE:
+                return DoubleType.get();
+            case DATE:
+                return DateType.get();
+            case TIME:
+                return TimeType.get();
+            case TIMESTAMP:
+                return ((TimestampType) type).shouldAdjustToUTC()
+                        ? TimestampType.withZone() : TimestampType.withoutZone();
+            case STRING:
+                return StringType.get();
+            case UUID:
+                return UUIDType.get();
+            case BINARY:
+                return BinaryType.get();
+            case FIXED:
+            case DECIMAL:
+                return type;
+            case STRUCT:
+                return StructType.of(
+                        ((StructType) type).fields()
+                                .stream()
+                                .map(RecordWithSchema::replaceField)
+                                .collect(Collectors.toList()));
+            case LIST:
+                ListType listType = ((ListType) type);
+                return listType.isElementOptional()
+                        ? ListType.ofOptional(listType.elementId(), replaceType(listType.elementType()))
+                        : ListType.ofRequired(listType.elementId(), replaceType(listType.elementType()));
+            case MAP:
+                MapType mapType = ((MapType) type);
+                return mapType.isValueOptional()
+                        ? MapType.ofOptional(mapType.keyId(), mapType.valueId(),
+                                replaceType(mapType.keyType()), replaceType(mapType.valueType()))
+                        : MapType.ofRequired(mapType.keyId(), mapType.valueId(),
+                                replaceType(mapType.keyType()), replaceType(mapType.valueType()));
+            default:
+                throw new UnsupportedOperationException("Unspported type: " + type);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
new file mode 100644
index 000000000..b73d27519
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java
@@ -0,0 +1,136 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.sink.TableChange;
+import org.apache.inlong.sort.iceberg.FlinkTypeToType;
+import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
+import org.apache.inlong.sort.base.sink.TableChange.ColumnPosition;
+import org.apache.inlong.sort.base.sink.TableChange.UnknownColumnChange;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SchemaChangeUtils {
+    private static final Joiner DOT = Joiner.on(".");
+
+    /**
+     * Compare two schemas and get the schema changes that happened in them.
+     * TODO: currently only support add column
+     *
+     * @param oldSchema
+     * @param newSchema
+     * @return
+     */
+    static List<TableChange> diffSchema(Schema oldSchema, Schema newSchema) {
+        List<String> oldFields = oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
+        List<String> newFields = newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList());
+        int oi = 0;
+        int ni = 0;
+        List<TableChange> tableChanges = new ArrayList<>();
+        while (ni < newFields.size()) {
+            if (oi < oldFields.size() && oldFields.get(oi).equals(newFields.get(ni))) {
+                oi++;
+                ni++;
+            } else {
+                NestedField newField = newSchema.findField(newFields.get(ni));
+                tableChanges.add(
+                        new AddColumn(
+                                new String[]{newField.name()},
+                                FlinkSchemaUtil.convert(newField.type()),
+                                !newField.isRequired(),
+                                newField.doc(),
+                                ni == 0 ? ColumnPosition.first() : ColumnPosition.after(newFields.get(ni - 1))));
+                ni++;
+            }
+        }
+
+        if (oi != oldFields.size()) {
+            tableChanges.clear();
+            tableChanges.add(
+                    new UnknownColumnChange(
+                            String.format("Unsupported schema update.\n"
+                                    + "oldSchema:\n%s\n, newSchema:\n %s", oldSchema, newSchema)));
+        }
+
+        return tableChanges;
+    }
+
+    public static void applySchemaChanges(UpdateSchema pendingUpdate, List<TableChange> tableChanges) {
+        for (TableChange change : tableChanges) {
+            if (change instanceof TableChange.AddColumn) {
+                apply(pendingUpdate, (TableChange.AddColumn) change);
+            } else {
+                throw new UnsupportedOperationException("Cannot apply unknown table change: " + change);
+            }
+        }
+        pendingUpdate.commit();
+    }
+
+    public static void apply(UpdateSchema pendingUpdate, TableChange.AddColumn add) {
+        Preconditions.checkArgument(add.isNullable(),
+                "Incompatible change: cannot add required column: %s", leafName(add.fieldNames()));
+        Type type = add.dataType().accept(new FlinkTypeToType(RowType.of(add.dataType())));
+        pendingUpdate.addColumn(parentName(add.fieldNames()), leafName(add.fieldNames()), type, add.comment());
+
+        if (add.position() instanceof TableChange.After) {
+            TableChange.After after = (TableChange.After) add.position();
+            String referenceField = peerName(add.fieldNames(), after.column());
+            pendingUpdate.moveAfter(DOT.join(add.fieldNames()), referenceField);
+
+        } else if (add.position() instanceof TableChange.First) {
+            pendingUpdate.moveFirst(DOT.join(add.fieldNames()));
+
+        } else {
+            Preconditions.checkArgument(add.position() == null,
+                    "Cannot add '%s' at unknown position: %s", DOT.join(add.fieldNames()), add.position());
+        }
+    }
+
+    public static String leafName(String[] fieldNames) {
+        Preconditions.checkArgument(fieldNames.length > 0, "Invalid field name: at least one name is required");
+        return fieldNames[fieldNames.length - 1];
+    }
+
+    public static String peerName(String[] fieldNames, String fieldName) {
+        if (fieldNames.length > 1) {
+            String[] peerNames = Arrays.copyOf(fieldNames, fieldNames.length);
+            peerNames[fieldNames.length - 1] = fieldName;
+            return DOT.join(peerNames);
+        }
+        return fieldName;
+    }
+
+    public static String parentName(String[] fieldNames) {
+        if (fieldNames.length > 1) {
+            return DOT.join(Arrays.copyOfRange(fieldNames, 0, fieldNames.length - 1));
+        }
+        return null;
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaEvolutionFunction.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaEvolutionFunction.java
new file mode 100644
index 000000000..aee6ed13e
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaEvolutionFunction.java
@@ -0,0 +1,23 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.iceberg.sink.multiple;
+
+public interface SchemaEvolutionFunction<Schema> {
+    void schemaEvolution(Schema schema) throws Exception;
+}