You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/18 09:08:42 UTC

[inlong] branch master updated: [INLONG-7829][Sort] Add mini-batch pre aggregate by partition when ingesting data into iceberg (#7832)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ee18c8f9 [INLONG-7829][Sort] Add mini-batch pre aggregate by partition when ingesting data into iceberg (#7832)
3ee18c8f9 is described below

commit 3ee18c8f916338211330936821a047c9c7dbdfa0
Author: thexia <37...@users.noreply.github.com>
AuthorDate: Tue Apr 18 17:08:36 2023 +0800

    [INLONG-7829][Sort] Add mini-batch pre aggregate by partition when ingesting data into iceberg (#7832)
    
    Co-authored-by: thexiay <xi...@gmail.com>
---
 .../sort/iceberg/FlinkDynamicTableFactory.java     |  14 ++
 .../inlong/sort/iceberg/IcebergTableSink.java      |   3 +
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  82 +++++++---
 .../sink/GroupedPartitionedDeltaWriter.java        |  90 ++++++++++
 .../sink/GroupedPartitionedFanoutWriter.java       |  85 ++++++++++
 .../sink/IcebergMiniBatchGroupOperator.java        | 181 +++++++++++++++++++++
 .../iceberg/sink/RowDataTaskWriterFactory.java     |  44 ++++-
 .../sink/multiple/IcebergMultipleStreamWriter.java |   3 +-
 8 files changed, 474 insertions(+), 28 deletions(-)

diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index a4fd3cf56..dfd5411fa 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -139,6 +139,18 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
                     .withDescription("Write record rate limit per second to"
                             + " prevent traffic jitter and improve stability, default 0 (no limit)");
 
+    public static final ConfigOption<Boolean> WRITE_MINI_BATCH_ENABLE =
+            ConfigOptions.key("write.mini-batch.enable")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to buffer some data to sort before write to files(reduce memory loss)");
+
+    public static final ConfigOption<Integer> WRITE_PARALLELISM =
+            ConfigOptions.key("write.parallelism")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("writer parallelism");
+
     private final FlinkCatalog catalog;
 
     public FlinkDynamicTableFactory() {
@@ -305,6 +317,8 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
         options.add(WRITE_COMPACT_INTERVAL);
         options.add(WRITE_DISTRIBUTION_MODE);
         options.add(WRITE_RATE_LIMIT);
+        options.add(WRITE_MINI_BATCH_ENABLE);
+        options.add(WRITE_PARALLELISM);
         return options;
     }
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index cd239633e..519bbd3f9 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -56,6 +56,7 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
 import static org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_DISTRIBUTION_MODE;
+import static org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_PARALLELISM;
 
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
@@ -133,6 +134,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
                             .withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
                             .withPkAutoGenerated(tableOptions.get(SINK_MULTIPLE_PK_AUTO_GENERATED))
                             .build())
+                    .writeParallelism(tableOptions.get(WRITE_PARALLELISM))
                     .dirtyOptions(dirtyOptions)
                     .dirtySink(dirtySink)
                     .action(actionsProvider)
@@ -143,6 +145,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
             return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
                     .tableLoader(tableLoader)
                     .tableSchema(tableSchema)
+                    .writeParallelism(tableOptions.get(WRITE_PARALLELISM))
                     .equalityFieldColumns(equalityColumns)
                     .overwrite(overwrite)
                     .appendMode(tableOptions.get(IGNORE_ALL_CHANGELOG))
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 7524bf5f1..c2a460115 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
 import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
@@ -36,6 +37,7 @@ import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SerializableTable;
@@ -56,7 +58,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PropertyUtil;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
@@ -77,10 +78,11 @@ import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
-import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
-import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+import static org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_MINI_BATCH_ENABLE;
 import static org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_RATE_LIMIT;
 
 /**
@@ -164,7 +166,6 @@ public class FlinkSink {
         private boolean upsert = false;
         private List<String> equalityFieldColumns = null;
         private String uidPrefix = null;
-        private ReadableConfig readableConfig = new Configuration();
         private final Map<String, String> writeOptions = Maps.newHashMap();
         private FlinkWriteConf flinkWriteConf = null;
         private String inlongMetric = null;
@@ -174,7 +175,7 @@ public class FlinkSink {
         private MultipleSinkOption multipleSinkOption = null;
         private DirtyOptions dirtyOptions;
         private @Nullable DirtySink<Object> dirtySink;
-        private ReadableConfig tableOptions;
+        private ReadableConfig tableOptions = new Configuration();
 
         private Builder() {
         }
@@ -259,11 +260,6 @@ public class FlinkSink {
             return this;
         }
 
-        public Builder flinkConf(ReadableConfig config) {
-            this.readableConfig = config;
-            return this;
-        }
-
         /**
          * The appendMode properties is used to insert data without equality field columns.
          *
@@ -416,7 +412,7 @@ public class FlinkSink {
                 }
             }
 
-            flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);
+            flinkWriteConf = new FlinkWriteConf(table, writeOptions, tableOptions);
 
             // Find out the equality field id list based on the user-provided equality field column names.
             List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
@@ -430,12 +426,9 @@ public class FlinkSink {
                     distributeDataStream(
                             rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType);
 
-            // Add rate limit if necessary
-            DataStream<RowData> inputWithRateLimit = appendWithRateLimit(distributeStream);
-
             // Add parallel writers that append rows to files
             SingleOutputStreamOperator<WriteResult> writerStream =
-                    appendWriter(inputWithRateLimit, flinkRowType, equalityFieldIds);
+                    appendWriter(distributeStream, flinkRowType, equalityFieldIds);
 
             // Add single-parallelism committer that commits files
             // after successful checkpoint or end of input
@@ -536,10 +529,11 @@ public class FlinkSink {
                 return input;
             }
 
+            int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
             SingleOutputStreamOperator<T> inputWithRateLimit = input
                     .map(new RateLimitMapFunction(tableOptions.get(WRITE_RATE_LIMIT)))
                     .name("rate_limit")
-                    .setParallelism(input.getParallelism());
+                    .setParallelism(parallelism);
 
             if (uidPrefix != null) {
                 ((SingleOutputStreamOperator) inputWithRateLimit).uid(uidPrefix + "_rate_limit");
@@ -547,6 +541,44 @@ public class FlinkSink {
             return inputWithRateLimit;
         }
 
+        /**
+         * This operator is used to buffer a mini batch data group by field before writing to save memory resources.
+         *
+         * This operator must have the same degree of parallelism as the writer operator to ensure that the data of
+         * the same batch of equality fields in a checkpoint period will only be processed once by a certain writer
+         * subtask (rather than processed multiple times).
+         *
+         * @param input
+         * @return
+         */
+        private DataStream<RowData> appendWithMiniBatchGroup(DataStream<RowData> input,
+                RowType flinkType,
+                Set<Integer> equalityFieldIds) {
+            if (!tableOptions.get(WRITE_MINI_BATCH_ENABLE)) {
+                return input;
+            }
+
+            FieldGetter[] fieldGetters =
+                    IntStream.range(0, flinkType.getFieldCount())
+                            .mapToObj(i -> RowData.createFieldGetter(flinkType.getTypeAt(i), i))
+                            .toArray(RowData.FieldGetter[]::new);
+            Schema writeSchema = TypeUtil.reassignIds(
+                    FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkType)), table.schema());
+            Schema deleteSchema = TypeUtil.select(table.schema(), equalityFieldIds);
+            PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+
+            int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
+            SingleOutputStreamOperator<RowData> inputWithMiniBatchGroup = input
+                    .transform("mini_batch_group",
+                            input.getType(),
+                            new IcebergMiniBatchGroupOperator(fieldGetters, deleteSchema, writeSchema, partitionKey))
+                    .setParallelism(parallelism);
+            if (uidPrefix != null) {
+                inputWithMiniBatchGroup.uid(uidPrefix + "mini_batch_group");
+            }
+            return inputWithMiniBatchGroup;
+        }
+
         private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
             IcebergProcessOperator<WriteResult, Void> filesCommitter = new IcebergProcessOperator<>(
                     new IcebergSingleFileCommiter(
@@ -584,8 +616,7 @@ public class FlinkSink {
                 List<Integer> equalityFieldIds) {
             // Fallback to use upsert mode parsed from table properties if don't specify in job level.
             // Only if not appendMode, upsert can be valid.
-            boolean upsertMode = (flinkWriteConf.upsertMode() || PropertyUtil.propertyAsBoolean(table.properties(),
-                    UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT)) && !appendMode;
+            boolean upsertMode = flinkWriteConf.upsertMode() && !appendMode;
 
             // Validate the equality fields and partition fields if we enable the upsert mode.
             if (upsertMode) {
@@ -602,12 +633,17 @@ public class FlinkSink {
                 }
             }
 
+            // Add rate limit if necessary
+            DataStream<RowData> inputWithRateLimit = appendWithRateLimit(input);
+            DataStream<RowData> inputWithMiniBatch = appendWithMiniBatchGroup(
+                    inputWithRateLimit, flinkRowType, equalityFieldIds.stream().collect(Collectors.toSet()));
+
             IcebergProcessOperator<RowData, WriteResult> streamWriter = createStreamWriter(
                     table, flinkRowType, equalityFieldIds, flinkWriteConf, appendMode, inlongMetric,
-                    auditHostAndPorts, dirtyOptions, dirtySink);
+                    auditHostAndPorts, dirtyOptions, dirtySink, tableOptions.get(WRITE_MINI_BATCH_ENABLE));
 
             int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
-            SingleOutputStreamOperator<WriteResult> writerStream = input
+            SingleOutputStreamOperator<WriteResult> writerStream = inputWithMiniBatch
                     .transform(operatorName(ICEBERG_STREAM_WRITER_NAME),
                             TypeInformation.of(WriteResult.class),
                             streamWriter)
@@ -742,7 +778,8 @@ public class FlinkSink {
             String inlongMetric,
             String auditHostAndPorts,
             DirtyOptions dirtyOptions,
-            @Nullable DirtySink<Object> dirtySink) {
+            @Nullable DirtySink<Object> dirtySink,
+            boolean miniBatchMode) {
         // flink A, iceberg a
         Preconditions.checkArgument(table != null, "Iceberg table should't be null");
 
@@ -756,7 +793,8 @@ public class FlinkSink {
                         flinkWriteConf.dataFileFormat(),
                         equalityFieldIds,
                         flinkWriteConf.upsertMode(),
-                        appendMode);
+                        appendMode,
+                        miniBatchMode);
 
         return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>(
                 table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts,
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/GroupedPartitionedDeltaWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/GroupedPartitionedDeltaWriter.java
new file mode 100644
index 000000000..c72fc2d43
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/GroupedPartitionedDeltaWriter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class GroupedPartitionedDeltaWriter extends BaseDeltaTaskWriter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GroupedPartitionedDeltaWriter.class);
+
+    private final PartitionKey partitionKey;
+
+    private String latestPartitionPath;
+
+    private RowDataDeltaWriter latestWriter;
+
+    GroupedPartitionedDeltaWriter(PartitionSpec spec,
+            FileFormat format,
+            FileAppenderFactory<RowData> appenderFactory,
+            OutputFileFactory fileFactory,
+            FileIO io,
+            long targetFileSize,
+            Schema schema,
+            RowType flinkSchema,
+            List<Integer> equalityFieldIds,
+            boolean upsert) {
+        super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+                upsert);
+        this.partitionKey = new PartitionKey(spec, schema);
+    }
+
+    @Override
+    public RowDataDeltaWriter route(RowData row) {
+        partitionKey.partition(wrapper().wrap(row));
+        if (latestPartitionPath != null && partitionKey.toPath().equals(latestPartitionPath)) {
+            return latestWriter;
+        }
+
+        closeCurrentWriter();
+        latestPartitionPath = partitionKey.toPath();
+        latestWriter = new RowDataDeltaWriter(partitionKey.copy());
+        return latestWriter;
+    }
+
+    @Override
+    public void close() {
+        closeCurrentWriter();
+        latestWriter = null;
+        latestPartitionPath = null;
+    }
+
+    private void closeCurrentWriter() {
+        if (latestWriter != null) {
+            try {
+                latestWriter.close();
+            } catch (IOException e) {
+                LOG.error("Exception occur when closing file {}.", latestPartitionPath);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/GroupedPartitionedFanoutWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/GroupedPartitionedFanoutWriter.java
new file mode 100644
index 000000000..ff3f0bf81
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/GroupedPartitionedFanoutWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.io.BaseTaskWriter;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public abstract class GroupedPartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GroupedPartitionedFanoutWriter.class);
+
+    private String latestPartitionPath;
+
+    private RollingFileWriter latestWriter;
+
+    protected GroupedPartitionedFanoutWriter(PartitionSpec spec, FileFormat format,
+            FileAppenderFactory<T> appenderFactory,
+            OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+        super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+    }
+
+    /**
+     * Create a PartitionKey from the values in row.
+     * <p>
+     * Any PartitionKey returned by this method can be reused by the implementation.
+     *
+     * @param row a data row
+     */
+    protected abstract PartitionKey partition(T row);
+
+    @Override
+    public void write(T row) throws IOException {
+        PartitionKey partitionKey = partition(row);
+        if (latestPartitionPath == null || !partitionKey.toPath().equals(latestPartitionPath)) {
+            // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+            closeCurrentWriter();
+            latestWriter = new RollingFileWriter(partitionKey.copy());
+            latestPartitionPath = partitionKey.toPath();
+        }
+
+        latestWriter.write(row);
+    }
+
+    @Override
+    public void close() throws IOException {
+        closeCurrentWriter();
+        latestWriter = null;
+        latestPartitionPath = null;
+    }
+
+    private void closeCurrentWriter() {
+        if (latestWriter != null) {
+            try {
+                latestWriter.close();
+            } catch (IOException e) {
+                LOG.error("Exception occur when closing file {}.", latestPartitionPath);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergMiniBatchGroupOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergMiniBatchGroupOperator.java
new file mode 100644
index 000000000..b862cee32
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergMiniBatchGroupOperator.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.types.Types.NestedField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class IcebergMiniBatchGroupOperator extends TableStreamOperator<RowData>
+        implements
+            OneInputStreamOperator<RowData, RowData>,
+            BoundedOneInput {
+
+    private static final long serialVersionUID = 9042068324817807379L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(IcebergMiniBatchGroupOperator.class);
+
+    private transient StreamRecordCollector<RowData> collector;
+    private transient Map<Tuple2<String, RowData>, RowData> inputBuffer;
+    private transient RowDataWrapper wrapper;
+
+    private final FieldGetter[] fieldsGetter;
+    private final int[] equalityFieldIndex; // the position ordered of equality field in row schema
+    private final PartitionKey partitionKey; // partition key helper
+    private final Schema rowSchema; // the whole field schema
+
+    /**
+     * Initialize field index.
+     *
+     * @param fieldsGetter function to get object from {@link RowData}
+     * @param deleteSchema equality fields schema
+     * @param rowSchema row data schema
+     * @param partitionKey partition key
+     */
+    public IcebergMiniBatchGroupOperator(
+            FieldGetter[] fieldsGetter,
+            Schema deleteSchema,
+            Schema rowSchema,
+            PartitionKey partitionKey) {
+        this.fieldsGetter = fieldsGetter;
+        // note: here because `NestedField` does not override equals function, so can not indexOf by `NestedField`
+        this.equalityFieldIndex = deleteSchema.columns().stream()
+                .map(field -> rowSchema.columns()
+                        .stream()
+                        .map(NestedField::fieldId)
+                        .collect(Collectors.toList())
+                        .indexOf(field.fieldId()))
+                .sorted()
+                .mapToInt(Integer::valueOf)
+                .toArray();
+        this.partitionKey = partitionKey;
+        this.rowSchema = rowSchema;
+        // do some check, check whether index is legal. can not be null and unique, and number in fields range.
+        Preconditions.checkArgument(
+                Arrays.stream(equalityFieldIndex)
+                        .allMatch(index -> index >= 0 && index < fieldsGetter.length),
+                String.format("Any equality field index (%s) should in a legal range.",
+                        Arrays.toString(equalityFieldIndex)));
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        LOG.info("Opening IcebergMiniBatchGroupOperator");
+
+        this.collector = new StreamRecordCollector<>(output);
+        this.wrapper = new RowDataWrapper(FlinkSchemaUtil.convert(rowSchema), rowSchema.asStruct());
+        this.inputBuffer = new HashMap<>();
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        RowData row = element.getValue();
+        RowData primaryKey = GenericRowData.of(Arrays.stream(equalityFieldIndex)
+                .boxed()
+                .map(index -> fieldsGetter[index].getFieldOrNull(row))
+                .toArray(Object[]::new));
+        partitionKey.partition(wrapper.wrap(row));
+
+        if (RowDataUtil.isAccumulateMsg(row)) {
+            inputBuffer.put(new Tuple2<>(partitionKey.toPath(), primaryKey), row);
+        } else {
+            inputBuffer.remove(new Tuple2<>(partitionKey.toPath(), primaryKey));
+        }
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        super.prepareSnapshotPreBarrier(checkpointId);
+        flush();
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        flush();
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        flush();
+    }
+
+    private void flush() throws Exception {
+        LOG.info("Flushing IcebergMiniBatchGroupOperator.");
+        // Emit the rows group by partition
+        // scan range key, this range key contains all one partition data
+        if (!inputBuffer.isEmpty()) {
+            // Emit the rows group by partition
+            Map<String, List<RowData>> map0 = inputBuffer.entrySet()
+                    .stream()
+                    .<Map<String, List<RowData>>>reduce(
+                            new HashMap<>(),
+                            (map, record) -> {
+                                String partition = record.getKey().f0;
+                                map.compute(partition, (String par, List<RowData> oldList) -> {
+                                    if (oldList == null) {
+                                        List<RowData> list = new ArrayList<>();
+                                        list.add(record.getValue());
+                                        return list;
+                                    }
+
+                                    oldList.add(record.getValue());
+                                    return oldList;
+                                });
+                                return map;
+                            },
+                            (map1, map2) -> {
+                                for (String key : map2.keySet()) {
+                                    if (!map1.containsKey(key)) {
+                                        map1.put(key, map2.get(key));
+                                    } else {
+                                        map1.get(key).addAll(map2.get(key));
+                                    }
+                                }
+                                return map1;
+                            });
+            map0.values()
+                    .forEach(
+                            list -> list.forEach(record -> collector.collect(record)));
+        }
+        inputBuffer.clear();
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
index 52559a803..44341feff 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
@@ -56,6 +56,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
     private final List<Integer> equalityFieldIds;
     private final boolean upsert;
     private final boolean appendMode;
+    private final boolean miniBatchMode;
     private final FileAppenderFactory<RowData> appenderFactory;
 
     private transient OutputFileFactory outputFileFactory;
@@ -67,7 +68,8 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
             FileFormat format,
             List<Integer> equalityFieldIds,
             boolean upsert,
-            boolean appendMode) {
+            boolean appendMode,
+            boolean miniBatchMode) {
         this.table = table;
         this.schema = scheam;
         this.flinkSchema = flinkSchema;
@@ -78,6 +80,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
         this.equalityFieldIds = equalityFieldIds;
         this.upsert = upsert;
         this.appendMode = appendMode;
+        this.miniBatchMode = miniBatchMode;
 
         if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
             this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
@@ -110,8 +113,13 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
                 return new UnpartitionedWriter<>(
                         spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes);
             } else {
-                return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
-                        io, targetFileSizeBytes, schema, flinkSchema);
+                if (miniBatchMode) {
+                    return new RowDataGroupedPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
+                            io, targetFileSizeBytes, schema, flinkSchema);
+                } else {
+                    return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
+                            io, targetFileSizeBytes, schema, flinkSchema);
+                }
             }
         } else {
             // Initialize a task writer to write both INSERT and equality DELETE.
@@ -119,8 +127,13 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
                 return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
                         targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
             } else {
-                return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
-                        targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+                if (miniBatchMode) {
+                    return new GroupedPartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
+                            targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+                } else {
+                    return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
+                            targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+                }
             }
         }
     }
@@ -145,4 +158,25 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
             return partitionKey;
         }
     }
+
+    private static class RowDataGroupedPartitionedFanoutWriter extends GroupedPartitionedFanoutWriter<RowData> {
+
+        private final PartitionKey partitionKey;
+        private final RowDataWrapper rowDataWrapper;
+
+        RowDataGroupedPartitionedFanoutWriter(
+                PartitionSpec spec, FileFormat format, FileAppenderFactory<RowData> appenderFactory,
+                OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema,
+                RowType flinkSchema) {
+            super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+            this.partitionKey = new PartitionKey(spec, schema);
+            this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+        }
+
+        @Override
+        protected PartitionKey partition(RowData row) {
+            partitionKey.partition(rowDataWrapper.wrap(row));
+            return partitionKey;
+        }
+    }
 }
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 864ce85da..3e29f20fc 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -219,7 +219,8 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
                     fileFormat,
                     equalityFieldIds,
                     upsertMode,
-                    appendMode);
+                    appendMode,
+                    false);
 
             if (multipleWriters.get(tableId) == null) {
                 StringBuilder subWriterInlongMetric = new StringBuilder(inlongMetric);