You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/18 09:08:42 UTC
[inlong] branch master updated: [INLONG-7829][Sort] Add mini-batch pre aggregate by partition when ingesting data into iceberg (#7832)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3ee18c8f9 [INLONG-7829][Sort] Add mini-batch pre aggregate by partition when ingesting data into iceberg (#7832)
3ee18c8f9 is described below
commit 3ee18c8f916338211330936821a047c9c7dbdfa0
Author: thexia <37...@users.noreply.github.com>
AuthorDate: Tue Apr 18 17:08:36 2023 +0800
[INLONG-7829][Sort] Add mini-batch pre aggregate by partition when ingesting data into iceberg (#7832)
Co-authored-by: thexiay <xi...@gmail.com>
---
.../sort/iceberg/FlinkDynamicTableFactory.java | 14 ++
.../inlong/sort/iceberg/IcebergTableSink.java | 3 +
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 82 +++++++---
.../sink/GroupedPartitionedDeltaWriter.java | 90 ++++++++++
.../sink/GroupedPartitionedFanoutWriter.java | 85 ++++++++++
.../sink/IcebergMiniBatchGroupOperator.java | 181 +++++++++++++++++++++
.../iceberg/sink/RowDataTaskWriterFactory.java | 44 ++++-
.../sink/multiple/IcebergMultipleStreamWriter.java | 3 +-
8 files changed, 474 insertions(+), 28 deletions(-)
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index a4fd3cf56..dfd5411fa 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -139,6 +139,18 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
.withDescription("Write record rate limit per second to"
+ " prevent traffic jitter and improve stability, default 0 (no limit)");
+ public static final ConfigOption<Boolean> WRITE_MINI_BATCH_ENABLE =
+ ConfigOptions.key("write.mini-batch.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to buffer some data to sort before write to files(reduce memory loss)");
+
+ public static final ConfigOption<Integer> WRITE_PARALLELISM =
+ ConfigOptions.key("write.parallelism")
+ .intType()
+ .noDefaultValue()
+ .withDescription("writer parallelism");
+
private final FlinkCatalog catalog;
public FlinkDynamicTableFactory() {
@@ -305,6 +317,8 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
options.add(WRITE_COMPACT_INTERVAL);
options.add(WRITE_DISTRIBUTION_MODE);
options.add(WRITE_RATE_LIMIT);
+ options.add(WRITE_MINI_BATCH_ENABLE);
+ options.add(WRITE_PARALLELISM);
return options;
}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index cd239633e..519bbd3f9 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -56,6 +56,7 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
import static org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_DISTRIBUTION_MODE;
+import static org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_PARALLELISM;
/**
* Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
@@ -133,6 +134,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
.withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
.withPkAutoGenerated(tableOptions.get(SINK_MULTIPLE_PK_AUTO_GENERATED))
.build())
+ .writeParallelism(tableOptions.get(WRITE_PARALLELISM))
.dirtyOptions(dirtyOptions)
.dirtySink(dirtySink)
.action(actionsProvider)
@@ -143,6 +145,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
+ .writeParallelism(tableOptions.get(WRITE_PARALLELISM))
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.appendMode(tableOptions.get(IGNORE_ALL_CHANGELOG))
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 7524bf5f1..c2a460115 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
@@ -36,6 +37,7 @@ import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
@@ -56,7 +58,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PropertyUtil;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.sink.MultipleSinkOption;
@@ -77,10 +78,11 @@ import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
-import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
-import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+import static org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_MINI_BATCH_ENABLE;
import static org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_RATE_LIMIT;
/**
@@ -164,7 +166,6 @@ public class FlinkSink {
private boolean upsert = false;
private List<String> equalityFieldColumns = null;
private String uidPrefix = null;
- private ReadableConfig readableConfig = new Configuration();
private final Map<String, String> writeOptions = Maps.newHashMap();
private FlinkWriteConf flinkWriteConf = null;
private String inlongMetric = null;
@@ -174,7 +175,7 @@ public class FlinkSink {
private MultipleSinkOption multipleSinkOption = null;
private DirtyOptions dirtyOptions;
private @Nullable DirtySink<Object> dirtySink;
- private ReadableConfig tableOptions;
+ private ReadableConfig tableOptions = new Configuration();
private Builder() {
}
@@ -259,11 +260,6 @@ public class FlinkSink {
return this;
}
- public Builder flinkConf(ReadableConfig config) {
- this.readableConfig = config;
- return this;
- }
-
/**
* The appendMode properties is used to insert data without equality field columns.
*
@@ -416,7 +412,7 @@ public class FlinkSink {
}
}
- flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);
+ flinkWriteConf = new FlinkWriteConf(table, writeOptions, tableOptions);
// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
@@ -430,12 +426,9 @@ public class FlinkSink {
distributeDataStream(
rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType);
- // Add rate limit if necessary
- DataStream<RowData> inputWithRateLimit = appendWithRateLimit(distributeStream);
-
// Add parallel writers that append rows to files
SingleOutputStreamOperator<WriteResult> writerStream =
- appendWriter(inputWithRateLimit, flinkRowType, equalityFieldIds);
+ appendWriter(distributeStream, flinkRowType, equalityFieldIds);
// Add single-parallelism committer that commits files
// after successful checkpoint or end of input
@@ -536,10 +529,11 @@ public class FlinkSink {
return input;
}
+ int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
SingleOutputStreamOperator<T> inputWithRateLimit = input
.map(new RateLimitMapFunction(tableOptions.get(WRITE_RATE_LIMIT)))
.name("rate_limit")
- .setParallelism(input.getParallelism());
+ .setParallelism(parallelism);
if (uidPrefix != null) {
((SingleOutputStreamOperator) inputWithRateLimit).uid(uidPrefix + "_rate_limit");
@@ -547,6 +541,44 @@ public class FlinkSink {
return inputWithRateLimit;
}
+ /**
+ * This operator is used to buffer a mini batch data group by field before writing to save memory resources.
+ *
+ * This operator must have the same degree of parallelism as the writer operator to ensure that the data of
+ * the same batch of equality fields in a checkpoint period will only be processed once by a certain writer
+ * subtask (rather than processed multiple times).
+ *
+ * @param input
+ * @return
+ */
+ private DataStream<RowData> appendWithMiniBatchGroup(DataStream<RowData> input,
+ RowType flinkType,
+ Set<Integer> equalityFieldIds) {
+ if (!tableOptions.get(WRITE_MINI_BATCH_ENABLE)) {
+ return input;
+ }
+
+ FieldGetter[] fieldGetters =
+ IntStream.range(0, flinkType.getFieldCount())
+ .mapToObj(i -> RowData.createFieldGetter(flinkType.getTypeAt(i), i))
+ .toArray(RowData.FieldGetter[]::new);
+ Schema writeSchema = TypeUtil.reassignIds(
+ FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkType)), table.schema());
+ Schema deleteSchema = TypeUtil.select(table.schema(), equalityFieldIds);
+ PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+
+ int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
+ SingleOutputStreamOperator<RowData> inputWithMiniBatchGroup = input
+ .transform("mini_batch_group",
+ input.getType(),
+ new IcebergMiniBatchGroupOperator(fieldGetters, deleteSchema, writeSchema, partitionKey))
+ .setParallelism(parallelism);
+ if (uidPrefix != null) {
+ inputWithMiniBatchGroup.uid(uidPrefix + "mini_batch_group");
+ }
+ return inputWithMiniBatchGroup;
+ }
+
private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
IcebergProcessOperator<WriteResult, Void> filesCommitter = new IcebergProcessOperator<>(
new IcebergSingleFileCommiter(
@@ -584,8 +616,7 @@ public class FlinkSink {
List<Integer> equalityFieldIds) {
// Fallback to use upsert mode parsed from table properties if don't specify in job level.
// Only if not appendMode, upsert can be valid.
- boolean upsertMode = (flinkWriteConf.upsertMode() || PropertyUtil.propertyAsBoolean(table.properties(),
- UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT)) && !appendMode;
+ boolean upsertMode = flinkWriteConf.upsertMode() && !appendMode;
// Validate the equality fields and partition fields if we enable the upsert mode.
if (upsertMode) {
@@ -602,12 +633,17 @@ public class FlinkSink {
}
}
+ // Add rate limit if necessary
+ DataStream<RowData> inputWithRateLimit = appendWithRateLimit(input);
+ DataStream<RowData> inputWithMiniBatch = appendWithMiniBatchGroup(
+ inputWithRateLimit, flinkRowType, equalityFieldIds.stream().collect(Collectors.toSet()));
+
IcebergProcessOperator<RowData, WriteResult> streamWriter = createStreamWriter(
table, flinkRowType, equalityFieldIds, flinkWriteConf, appendMode, inlongMetric,
- auditHostAndPorts, dirtyOptions, dirtySink);
+ auditHostAndPorts, dirtyOptions, dirtySink, tableOptions.get(WRITE_MINI_BATCH_ENABLE));
int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
- SingleOutputStreamOperator<WriteResult> writerStream = input
+ SingleOutputStreamOperator<WriteResult> writerStream = inputWithMiniBatch
.transform(operatorName(ICEBERG_STREAM_WRITER_NAME),
TypeInformation.of(WriteResult.class),
streamWriter)
@@ -742,7 +778,8 @@ public class FlinkSink {
String inlongMetric,
String auditHostAndPorts,
DirtyOptions dirtyOptions,
- @Nullable DirtySink<Object> dirtySink) {
+ @Nullable DirtySink<Object> dirtySink,
+ boolean miniBatchMode) {
// flink A, iceberg a
Preconditions.checkArgument(table != null, "Iceberg table should't be null");
@@ -756,7 +793,8 @@ public class FlinkSink {
flinkWriteConf.dataFileFormat(),
equalityFieldIds,
flinkWriteConf.upsertMode(),
- appendMode);
+ appendMode,
+ miniBatchMode);
return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>(
table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts,
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/GroupedPartitionedDeltaWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/GroupedPartitionedDeltaWriter.java
new file mode 100644
index 000000000..c72fc2d43
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/GroupedPartitionedDeltaWriter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class GroupedPartitionedDeltaWriter extends BaseDeltaTaskWriter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GroupedPartitionedDeltaWriter.class);
+
+ private final PartitionKey partitionKey;
+
+ private String latestPartitionPath;
+
+ private RowDataDeltaWriter latestWriter;
+
+ GroupedPartitionedDeltaWriter(PartitionSpec spec,
+ FileFormat format,
+ FileAppenderFactory<RowData> appenderFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSize,
+ Schema schema,
+ RowType flinkSchema,
+ List<Integer> equalityFieldIds,
+ boolean upsert) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+ upsert);
+ this.partitionKey = new PartitionKey(spec, schema);
+ }
+
+ @Override
+ public RowDataDeltaWriter route(RowData row) {
+ partitionKey.partition(wrapper().wrap(row));
+ if (latestPartitionPath != null && partitionKey.toPath().equals(latestPartitionPath)) {
+ return latestWriter;
+ }
+
+ closeCurrentWriter();
+ latestPartitionPath = partitionKey.toPath();
+ latestWriter = new RowDataDeltaWriter(partitionKey.copy());
+ return latestWriter;
+ }
+
+ @Override
+ public void close() {
+ closeCurrentWriter();
+ latestWriter = null;
+ latestPartitionPath = null;
+ }
+
+ private void closeCurrentWriter() {
+ if (latestWriter != null) {
+ try {
+ latestWriter.close();
+ } catch (IOException e) {
+ LOG.error("Exception occur when closing file {}.", latestPartitionPath);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/GroupedPartitionedFanoutWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/GroupedPartitionedFanoutWriter.java
new file mode 100644
index 000000000..ff3f0bf81
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/GroupedPartitionedFanoutWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.io.BaseTaskWriter;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public abstract class GroupedPartitionedFanoutWriter<T> extends BaseTaskWriter<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GroupedPartitionedFanoutWriter.class);
+
+ private String latestPartitionPath;
+
+ private RollingFileWriter latestWriter;
+
+ protected GroupedPartitionedFanoutWriter(PartitionSpec spec, FileFormat format,
+ FileAppenderFactory<T> appenderFactory,
+ OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+ }
+
+ /**
+ * Create a PartitionKey from the values in row.
+ * <p>
+ * Any PartitionKey returned by this method can be reused by the implementation.
+ *
+ * @param row a data row
+ */
+ protected abstract PartitionKey partition(T row);
+
+ @Override
+ public void write(T row) throws IOException {
+ PartitionKey partitionKey = partition(row);
+ if (latestPartitionPath == null || !partitionKey.toPath().equals(latestPartitionPath)) {
+ // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+ closeCurrentWriter();
+ latestWriter = new RollingFileWriter(partitionKey.copy());
+ latestPartitionPath = partitionKey.toPath();
+ }
+
+ latestWriter.write(row);
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeCurrentWriter();
+ latestWriter = null;
+ latestPartitionPath = null;
+ }
+
+ private void closeCurrentWriter() {
+ if (latestWriter != null) {
+ try {
+ latestWriter.close();
+ } catch (IOException e) {
+ LOG.error("Exception occur when closing file {}.", latestPartitionPath);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergMiniBatchGroupOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergMiniBatchGroupOperator.java
new file mode 100644
index 000000000..b862cee32
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergMiniBatchGroupOperator.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.types.Types.NestedField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class IcebergMiniBatchGroupOperator extends TableStreamOperator<RowData>
+ implements
+ OneInputStreamOperator<RowData, RowData>,
+ BoundedOneInput {
+
+ private static final long serialVersionUID = 9042068324817807379L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(IcebergMiniBatchGroupOperator.class);
+
+ private transient StreamRecordCollector<RowData> collector;
+ private transient Map<Tuple2<String, RowData>, RowData> inputBuffer;
+ private transient RowDataWrapper wrapper;
+
+ private final FieldGetter[] fieldsGetter;
+ private final int[] equalityFieldIndex; // the position ordered of equality field in row schema
+ private final PartitionKey partitionKey; // partition key helper
+ private final Schema rowSchema; // the whole field schema
+
+ /**
+ * Initialize field index.
+ *
+ * @param fieldsGetter function to get object from {@link RowData}
+ * @param deleteSchema equality fields schema
+ * @param rowSchema row data schema
+ * @param partitionKey partition key
+ */
+ public IcebergMiniBatchGroupOperator(
+ FieldGetter[] fieldsGetter,
+ Schema deleteSchema,
+ Schema rowSchema,
+ PartitionKey partitionKey) {
+ this.fieldsGetter = fieldsGetter;
+ // note: here because `NestedField` does not override equals function, so can not indexOf by `NestedField`
+ this.equalityFieldIndex = deleteSchema.columns().stream()
+ .map(field -> rowSchema.columns()
+ .stream()
+ .map(NestedField::fieldId)
+ .collect(Collectors.toList())
+ .indexOf(field.fieldId()))
+ .sorted()
+ .mapToInt(Integer::valueOf)
+ .toArray();
+ this.partitionKey = partitionKey;
+ this.rowSchema = rowSchema;
+ // do some check, check whether index is legal. can not be null and unique, and number in fields range.
+ Preconditions.checkArgument(
+ Arrays.stream(equalityFieldIndex)
+ .allMatch(index -> index >= 0 && index < fieldsGetter.length),
+ String.format("Any equality field index (%s) should in a legal range.",
+ Arrays.toString(equalityFieldIndex)));
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ LOG.info("Opening IcebergMiniBatchGroupOperator");
+
+ this.collector = new StreamRecordCollector<>(output);
+ this.wrapper = new RowDataWrapper(FlinkSchemaUtil.convert(rowSchema), rowSchema.asStruct());
+ this.inputBuffer = new HashMap<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord<RowData> element) throws Exception {
+ RowData row = element.getValue();
+ RowData primaryKey = GenericRowData.of(Arrays.stream(equalityFieldIndex)
+ .boxed()
+ .map(index -> fieldsGetter[index].getFieldOrNull(row))
+ .toArray(Object[]::new));
+ partitionKey.partition(wrapper.wrap(row));
+
+ if (RowDataUtil.isAccumulateMsg(row)) {
+ inputBuffer.put(new Tuple2<>(partitionKey.toPath(), primaryKey), row);
+ } else {
+ inputBuffer.remove(new Tuple2<>(partitionKey.toPath(), primaryKey));
+ }
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ super.prepareSnapshotPreBarrier(checkpointId);
+ flush();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ flush();
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ flush();
+ }
+
+ private void flush() throws Exception {
+ LOG.info("Flushing IcebergMiniBatchGroupOperator.");
+ // Emit the rows group by partition
+ // scan range key, this range key contains all one partition data
+ if (!inputBuffer.isEmpty()) {
+ // Emit the rows group by partition
+ Map<String, List<RowData>> map0 = inputBuffer.entrySet()
+ .stream()
+ .<Map<String, List<RowData>>>reduce(
+ new HashMap<>(),
+ (map, record) -> {
+ String partition = record.getKey().f0;
+ map.compute(partition, (String par, List<RowData> oldList) -> {
+ if (oldList == null) {
+ List<RowData> list = new ArrayList<>();
+ list.add(record.getValue());
+ return list;
+ }
+
+ oldList.add(record.getValue());
+ return oldList;
+ });
+ return map;
+ },
+ (map1, map2) -> {
+ for (String key : map2.keySet()) {
+ if (!map1.containsKey(key)) {
+ map1.put(key, map2.get(key));
+ } else {
+ map1.get(key).addAll(map2.get(key));
+ }
+ }
+ return map1;
+ });
+ map0.values()
+ .forEach(
+ list -> list.forEach(record -> collector.collect(record)));
+ }
+ inputBuffer.clear();
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
index 52559a803..44341feff 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
@@ -56,6 +56,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
private final List<Integer> equalityFieldIds;
private final boolean upsert;
private final boolean appendMode;
+ private final boolean miniBatchMode;
private final FileAppenderFactory<RowData> appenderFactory;
private transient OutputFileFactory outputFileFactory;
@@ -67,7 +68,8 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
FileFormat format,
List<Integer> equalityFieldIds,
boolean upsert,
- boolean appendMode) {
+ boolean appendMode,
+ boolean miniBatchMode) {
this.table = table;
this.schema = scheam;
this.flinkSchema = flinkSchema;
@@ -78,6 +80,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
this.equalityFieldIds = equalityFieldIds;
this.upsert = upsert;
this.appendMode = appendMode;
+ this.miniBatchMode = miniBatchMode;
if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
@@ -110,8 +113,13 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
return new UnpartitionedWriter<>(
spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes);
} else {
- return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
- io, targetFileSizeBytes, schema, flinkSchema);
+ if (miniBatchMode) {
+ return new RowDataGroupedPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
+ io, targetFileSizeBytes, schema, flinkSchema);
+ } else {
+ return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
+ io, targetFileSizeBytes, schema, flinkSchema);
+ }
}
} else {
// Initialize a task writer to write both INSERT and equality DELETE.
@@ -119,8 +127,13 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
} else {
- return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
- targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+ if (miniBatchMode) {
+ return new GroupedPartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
+ targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+ } else {
+ return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
+ targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+ }
}
}
}
@@ -145,4 +158,25 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
return partitionKey;
}
}
+
+ private static class RowDataGroupedPartitionedFanoutWriter extends GroupedPartitionedFanoutWriter<RowData> {
+
+ private final PartitionKey partitionKey;
+ private final RowDataWrapper rowDataWrapper;
+
+ RowDataGroupedPartitionedFanoutWriter(
+ PartitionSpec spec, FileFormat format, FileAppenderFactory<RowData> appenderFactory,
+ OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema,
+ RowType flinkSchema) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+ this.partitionKey = new PartitionKey(spec, schema);
+ this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+ }
+
+ @Override
+ protected PartitionKey partition(RowData row) {
+ partitionKey.partition(rowDataWrapper.wrap(row));
+ return partitionKey;
+ }
+ }
}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 864ce85da..3e29f20fc 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -219,7 +219,8 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
fileFormat,
equalityFieldIds,
upsertMode,
- appendMode);
+ appendMode,
+ false);
if (multipleWriters.get(tableId) == null) {
StringBuilder subWriterInlongMetric = new StringBuilder(inlongMetric);