You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by pa...@apache.org on 2023/01/13 07:59:51 UTC
[inlong] branch master updated: [INLONG-7197][Sort] Iceberg connector supports keyby with the primary key (#7217)
This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f0538163a [INLONG-7197][Sort] Iceberg connector supports keyby with the primary key (#7217)
f0538163a is described below
commit f0538163a7e199ece22f8b29d25a674d03eb0a3f
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Fri Jan 13 15:59:45 2023 +0800
[INLONG-7197][Sort] Iceberg connector supports keyby with the primary key (#7217)
---
.../iceberg/sink/EqualityFieldKeySelector.java | 90 +++++++++
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 212 +++++++++++++--------
licenses/inlong-sort-connectors/LICENSE | 1 +
3 files changed, 225 insertions(+), 78 deletions(-)
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java
new file mode 100644
index 000000000..8feaa6a4c
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import java.util.List;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.StructProjection;
+
+/**
+ * Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record
+ * will be emitted to same writer in order.
+ * `EqualityFieldKeySelector` is copied from https://github.com/apache/iceberg/blob/e2bb9ad7e792efca419fa7c4a1afde7c4c44fa01/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java#L36
+ */
+class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {
+
+ private final Schema schema;
+ private final RowType flinkSchema;
+ private final Schema deleteSchema;
+
+ private transient RowDataWrapper rowDataWrapper;
+ private transient StructProjection structProjection;
+ private transient StructLikeWrapper structLikeWrapper;
+
+ EqualityFieldKeySelector(Schema schema, RowType flinkSchema, List<Integer> equalityFieldIds) {
+ this.schema = schema;
+ this.flinkSchema = flinkSchema;
+ this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
+ }
+
+ /**
+ * Construct the {@link RowDataWrapper} lazily here because few members in it are not
+ * serializable. In this way, we don't have to serialize them with forcing.
+ */
+ protected RowDataWrapper lazyRowDataWrapper() {
+ if (rowDataWrapper == null) {
+ rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+ }
+ return rowDataWrapper;
+ }
+
+ /**
+ * Construct the {@link StructProjection} lazily because it is not serializable.
+ */
+ protected StructProjection lazyStructProjection() {
+ if (structProjection == null) {
+ structProjection = StructProjection.create(schema, deleteSchema);
+ }
+ return structProjection;
+ }
+
+ /**
+ * Construct the {@link StructLikeWrapper} lazily because it is not serializable.
+ */
+ protected StructLikeWrapper lazyStructLikeWrapper() {
+ if (structLikeWrapper == null) {
+ structLikeWrapper = StructLikeWrapper.forType(deleteSchema.asStruct());
+ }
+ return structLikeWrapper;
+ }
+
+ @Override
+ public Integer getKey(RowData row) {
+ RowDataWrapper wrappedRowData = lazyRowDataWrapper().wrap(row);
+ StructProjection projectedRowData = lazyStructProjection().wrap(wrappedRowData);
+ StructLikeWrapper wrapper = lazyStructLikeWrapper().set(projectedRowData);
+ return wrapper.hashCode();
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index b0ed02abe..2fc1ca13d 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -17,9 +17,12 @@
package org.apache.inlong.sort.iceberg.sink;
+import java.util.Set;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -32,7 +35,6 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
-import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -42,12 +44,17 @@ import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkWriteConf;
+import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
@@ -68,18 +75,12 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
-import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
/**
* Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
@@ -155,13 +156,13 @@ public class FlinkSink {
private Table table;
private TableSchema tableSchema;
private ActionsProvider actionProvider;
- private boolean overwrite = false;
private boolean appendMode = false;
- private DistributionMode distributionMode = null;
private Integer writeParallelism = null;
- private boolean upsert = false;
private List<String> equalityFieldColumns = null;
private String uidPrefix = null;
+ private ReadableConfig readableConfig = new Configuration();
+ private final Map<String, String> writeOptions = Maps.newHashMap();
+ private FlinkWriteConf flinkWriteConf = null;
private String inlongMetric = null;
private String auditHostAndPorts = null;
private CatalogLoader catalogLoader = null;
@@ -248,7 +249,7 @@ public class FlinkSink {
}
public Builder overwrite(boolean newOverwrite) {
- this.overwrite = newOverwrite;
+ writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
return this;
}
@@ -304,9 +305,12 @@ public class FlinkSink {
* @return {@link Builder} to connect the iceberg table.
*/
public Builder distributionMode(DistributionMode mode) {
- Preconditions.checkArgument(!DistributionMode.RANGE.equals(mode),
+ Preconditions.checkArgument(
+ !DistributionMode.RANGE.equals(mode),
"Flink does not support 'range' write distribution mode now.");
- this.distributionMode = mode;
+ if (mode != null) {
+ writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
+ }
return this;
}
@@ -331,7 +335,7 @@ public class FlinkSink {
* @return {@link Builder} to connect the iceberg table.
*/
public Builder upsert(boolean enabled) {
- this.upsert = enabled;
+ writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled));
return this;
}
@@ -394,15 +398,21 @@ public class FlinkSink {
}
}
+ flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);
+
+ // Find out the equality field id list based on the user-provided equality field column names.
+ List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
+
// Convert the requested flink table schema to flink row type.
RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
// Distribute the records from input data stream based on the write.distribution-mode.
DataStream<RowData> distributeStream = distributeDataStream(
- rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
+ rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType);
// Add parallel writers that append rows to files
- SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);
+ SingleOutputStreamOperator<WriteResult> writerStream =
+ appendWriter(distributeStream, flinkRowType, equalityFieldIds);
// Add single-parallelism committer that commits files
// after successful checkpoint or end of input
@@ -453,6 +463,35 @@ public class FlinkSink {
return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
}
+ @VisibleForTesting
+ List<Integer> checkAndGetEqualityFieldIds() {
+ List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds());
+ if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
+ Set<Integer> equalityFieldSet =
+ Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
+ for (String column : equalityFieldColumns) {
+ org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
+ Preconditions.checkNotNull(
+ field,
+ "Missing required equality field column '%s' in table schema %s",
+ column,
+ table.schema());
+ equalityFieldSet.add(field.fieldId());
+ }
+
+ if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) {
+ LOG.warn(
+ "The configured equality field column IDs {} are not matched with the schema identifier "
+ + "field IDs {}, use job specified equality field columns as the equality fields "
+ + "by default.",
+ equalityFieldSet,
+ table.schema().identifierFieldIds());
+ }
+ equalityFieldIds = Lists.newArrayList(equalityFieldSet);
+ }
+ return equalityFieldIds;
+ }
+
@SuppressWarnings("unchecked")
private <T> DataStreamSink<T> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
DataStreamSink<T> resultStream = committerStream
@@ -469,7 +508,10 @@ public class FlinkSink {
private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
IcebergProcessOperator<WriteResult, Void> filesCommitter = new IcebergProcessOperator<>(
new IcebergSingleFileCommiter(
- TableIdentifier.of(table.name()), tableLoader, overwrite, actionProvider));
+ TableIdentifier.of(table.name()),
+ tableLoader,
+ flinkWriteConf.overwriteMode(),
+ actionProvider));
SingleOutputStreamOperator<Void> committerStream = writerStream
.transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
.setParallelism(1)
@@ -483,7 +525,8 @@ public class FlinkSink {
private SingleOutputStreamOperator<Void> appendMultipleCommitter(
SingleOutputStreamOperator<MultipleWriteResult> writerStream) {
IcebergProcessOperator<MultipleWriteResult, Void> multipleFilesCommiter =
- new IcebergProcessOperator<>(new IcebergMultipleFilesCommiter(catalogLoader, overwrite));
+ new IcebergProcessOperator<>(new IcebergMultipleFilesCommiter(catalogLoader,
+ flinkWriteConf.overwriteMode()));
SingleOutputStreamOperator<Void> committerStream = writerStream
.transform(operatorName(ICEBERG_MULTIPLE_FILES_COMMITTER_NAME), Types.VOID, multipleFilesCommiter)
.setParallelism(1)
@@ -494,26 +537,16 @@ public class FlinkSink {
return committerStream;
}
- private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
- // Find out the equality field id list based on the user-provided equality field column names.
- List<Integer> equalityFieldIds = Lists.newArrayList();
- if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
- for (String column : equalityFieldColumns) {
- org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
- Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
- column, table.schema());
- equalityFieldIds.add(field.fieldId());
- }
- }
-
+ private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType,
+ List<Integer> equalityFieldIds) {
// Fallback to use upsert mode parsed from table properties if don't specify in job level.
// Only if not appendMode, upsert can be valid.
- boolean upsertMode = (upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+ boolean upsertMode = (flinkWriteConf.upsertMode() || PropertyUtil.propertyAsBoolean(table.properties(),
UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT)) && !appendMode;
// Validate the equality fields and partition fields if we enable the upsert mode.
if (upsertMode) {
- Preconditions.checkState(!overwrite,
+ Preconditions.checkState(!flinkWriteConf.overwriteMode(),
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
Preconditions.checkState(!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
@@ -527,8 +560,8 @@ public class FlinkSink {
}
IcebergProcessOperator<RowData, WriteResult> streamWriter = createStreamWriter(
- table, flinkRowType, equalityFieldIds, upsertMode, appendMode, inlongMetric,
- auditHostAndPorts, dirtyOptions, dirtySink);
+ table, flinkRowType, equalityFieldIds, flinkWriteConf,
+ appendMode, inlongMetric, auditHostAndPorts, dirtyOptions, dirtySink);
int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -570,43 +603,75 @@ public class FlinkSink {
return writerStream;
}
- private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
- Map<String, String> properties,
+ private DataStream<RowData> distributeDataStream(
+ DataStream<RowData> input,
+ List<Integer> equalityFieldIds,
PartitionSpec partitionSpec,
Schema iSchema,
RowType flinkRowType) {
- DistributionMode writeMode;
- if (distributionMode == null) {
- // Fallback to use distribution mode parsed from table properties if don't specify in job level.
- String modeName = PropertyUtil.propertyAsString(properties,
- WRITE_DISTRIBUTION_MODE,
- WRITE_DISTRIBUTION_MODE_NONE);
-
- writeMode = DistributionMode.fromName(modeName);
- } else {
- writeMode = distributionMode;
- }
-
+ DistributionMode writeMode = flinkWriteConf.distributionMode();
+ LOG.info("Write distribution mode is '{}'", writeMode.modeName());
switch (writeMode) {
case NONE:
- return input;
+ if (equalityFieldIds.isEmpty()) {
+ return input;
+ } else {
+ LOG.info("Distribute rows by equality fields, because there are equality fields set");
+ return input.keyBy(
+ new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
+ }
case HASH:
- if (partitionSpec.isUnpartitioned()) {
- return input;
+ if (equalityFieldIds.isEmpty()) {
+ if (partitionSpec.isUnpartitioned()) {
+ LOG.warn(
+ "Fallback to use 'none' distribution mode, because there are no equality fields "
+ + "set and table is unpartitioned");
+ return input;
+ } else {
+ return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+ }
} else {
- return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+ if (partitionSpec.isUnpartitioned()) {
+ LOG.info(
+ "Distribute rows by equality fields, because there are equality fields set "
+ + "and table is unpartitioned");
+ return input.keyBy(
+ new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
+ } else {
+ for (PartitionField partitionField : partitionSpec.fields()) {
+ Preconditions.checkState(
+ equalityFieldIds.contains(partitionField.sourceId()),
+ "In 'hash' distribution mode with equality fields set, partition field '%s' "
+ + "should be included in equality fields: '%s'",
+ partitionField,
+ equalityFieldColumns);
+ }
+ return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+ }
}
case RANGE:
- LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
- WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
- return input;
+ if (equalityFieldIds.isEmpty()) {
+ LOG.warn(
+ "Fallback to use 'none' distribution mode, because there are no equality fields set "
+ + "and {}=range is not supported yet in flink",
+ WRITE_DISTRIBUTION_MODE);
+ return input;
+ } else {
+ LOG.info(
+ "Distribute rows by equality fields, because there are equality fields set "
+ + "and{}=range is not supported yet in flink",
+ WRITE_DISTRIBUTION_MODE);
+ return input.keyBy(
+ new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
+ }
default:
- throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
+ throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
}
}
+
}
static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
@@ -629,37 +694,28 @@ public class FlinkSink {
static IcebergProcessOperator<RowData, WriteResult> createStreamWriter(Table table,
RowType flinkRowType,
List<Integer> equalityFieldIds,
- boolean upsert,
+ FlinkWriteConf flinkWriteConf,
boolean appendMode,
String inlongMetric,
String auditHostAndPorts,
DirtyOptions dirtyOptions,
@Nullable DirtySink<Object> dirtySink) {
- // flink A, iceberg a
Preconditions.checkArgument(table != null, "Iceberg table should't be null");
- Map<String, String> props = table.properties();
- long targetFileSize = getTargetFileSizeBytes(props);
- FileFormat fileFormat = getFileFormat(props);
Table serializableTable = SerializableTable.copyOf(table);
- TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
- serializableTable, serializableTable.schema(), flinkRowType, targetFileSize,
- fileFormat, equalityFieldIds, upsert, appendMode);
- // Set null for flinkRowType of IcebergSingleStreamWriter
- // to avoid frequent Field.Getter creation in dirty data sink.
- return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>(
- table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts,
- null, dirtyOptions, dirtySink));
- }
+ TaskWriterFactory<RowData> taskWriterFactory =
+ new RowDataTaskWriterFactory(
+ serializableTable,
+ serializableTable.schema(),
+ flinkRowType,
+ flinkWriteConf.targetDataFileSize(),
+ flinkWriteConf.dataFileFormat(),
+ equalityFieldIds,
+ flinkWriteConf.upsertMode(),
+ appendMode);
- private static FileFormat getFileFormat(Map<String, String> properties) {
- String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
- return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+ return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>(
+ table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts, null, dirtyOptions, dirtySink));
}
- private static long getTargetFileSizeBytes(Map<String, String> properties) {
- return PropertyUtil.propertyAsLong(properties,
- WRITE_TARGET_FILE_SIZE_BYTES,
- WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
- }
}
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 693c6f5f8..a362ffeee 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -521,6 +521,7 @@
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java