You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by pa...@apache.org on 2023/01/13 07:59:51 UTC

[inlong] branch master updated: [INLONG-7197][Sort] Iceberg connector supports keyby with the primary key (#7217)

This is an automated email from the ASF dual-hosted git repository.

pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f0538163a [INLONG-7197][Sort] Iceberg connector supports keyby with the primary key (#7217)
f0538163a is described below

commit f0538163a7e199ece22f8b29d25a674d03eb0a3f
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Fri Jan 13 15:59:45 2023 +0800

    [INLONG-7197][Sort] Iceberg connector supports keyby with the primary key (#7217)
---
 .../iceberg/sink/EqualityFieldKeySelector.java     |  90 +++++++++
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 212 +++++++++++++--------
 licenses/inlong-sort-connectors/LICENSE            |   1 +
 3 files changed, 225 insertions(+), 78 deletions(-)

diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java
new file mode 100644
index 000000000..8feaa6a4c
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import java.util.List;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.StructProjection;
+
+/**
+ * Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record
+ * will be emitted to same writer in order.
+ * `EqualityFieldKeySelector` is copied from https://github.com/apache/iceberg/blob/e2bb9ad7e792efca419fa7c4a1afde7c4c44fa01/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java#L36
+ */
+class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {
+
+    private final Schema schema;
+    private final RowType flinkSchema;
+    private final Schema deleteSchema;
+
+    private transient RowDataWrapper rowDataWrapper;
+    private transient StructProjection structProjection;
+    private transient StructLikeWrapper structLikeWrapper;
+
+    EqualityFieldKeySelector(Schema schema, RowType flinkSchema, List<Integer> equalityFieldIds) {
+        this.schema = schema;
+        this.flinkSchema = flinkSchema;
+        this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
+    }
+
+    /**
+     * Construct the {@link RowDataWrapper} lazily here because few members in it are not
+     * serializable. In this way, we don't have to serialize them with forcing.
+     */
+    protected RowDataWrapper lazyRowDataWrapper() {
+        if (rowDataWrapper == null) {
+            rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+        }
+        return rowDataWrapper;
+    }
+
+    /**
+     * Construct the {@link StructProjection} lazily because it is not serializable.
+     */
+    protected StructProjection lazyStructProjection() {
+        if (structProjection == null) {
+            structProjection = StructProjection.create(schema, deleteSchema);
+        }
+        return structProjection;
+    }
+
+    /**
+     * Construct the {@link StructLikeWrapper} lazily because it is not serializable.
+     */
+    protected StructLikeWrapper lazyStructLikeWrapper() {
+        if (structLikeWrapper == null) {
+            structLikeWrapper = StructLikeWrapper.forType(deleteSchema.asStruct());
+        }
+        return structLikeWrapper;
+    }
+
+    @Override
+    public Integer getKey(RowData row) {
+        RowDataWrapper wrappedRowData = lazyRowDataWrapper().wrap(row);
+        StructProjection projectedRowData = lazyStructProjection().wrap(wrappedRowData);
+        StructLikeWrapper wrapper = lazyStructLikeWrapper().set(projectedRowData);
+        return wrapper.hashCode();
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index b0ed02abe..2fc1ca13d 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -17,9 +17,12 @@
 
 package org.apache.inlong.sort.iceberg.sink;
 
+import java.util.Set;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -32,7 +35,6 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -42,12 +44,17 @@ import org.apache.iceberg.actions.ActionsProvider;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkWriteConf;
+import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.sink.TaskWriterFactory;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
@@ -68,18 +75,12 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.function.Function;
 
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
 import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
 import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
-import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
 
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
@@ -155,13 +156,13 @@ public class FlinkSink {
         private Table table;
         private TableSchema tableSchema;
         private ActionsProvider actionProvider;
-        private boolean overwrite = false;
         private boolean appendMode = false;
-        private DistributionMode distributionMode = null;
         private Integer writeParallelism = null;
-        private boolean upsert = false;
         private List<String> equalityFieldColumns = null;
         private String uidPrefix = null;
+        private ReadableConfig readableConfig = new Configuration();
+        private final Map<String, String> writeOptions = Maps.newHashMap();
+        private FlinkWriteConf flinkWriteConf = null;
         private String inlongMetric = null;
         private String auditHostAndPorts = null;
         private CatalogLoader catalogLoader = null;
@@ -248,7 +249,7 @@ public class FlinkSink {
         }
 
         public Builder overwrite(boolean newOverwrite) {
-            this.overwrite = newOverwrite;
+            writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
             return this;
         }
 
@@ -304,9 +305,12 @@ public class FlinkSink {
          * @return {@link Builder} to connect the iceberg table.
          */
         public Builder distributionMode(DistributionMode mode) {
-            Preconditions.checkArgument(!DistributionMode.RANGE.equals(mode),
+            Preconditions.checkArgument(
+                    !DistributionMode.RANGE.equals(mode),
                     "Flink does not support 'range' write distribution mode now.");
-            this.distributionMode = mode;
+            if (mode != null) {
+                writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
+            }
             return this;
         }
 
@@ -331,7 +335,7 @@ public class FlinkSink {
          * @return {@link Builder} to connect the iceberg table.
          */
         public Builder upsert(boolean enabled) {
-            this.upsert = enabled;
+            writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled));
             return this;
         }
 
@@ -394,15 +398,21 @@ public class FlinkSink {
                 }
             }
 
+            flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);
+
+            // Find out the equality field id list based on the user-provided equality field column names.
+            List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
+
             // Convert the requested flink table schema to flink row type.
             RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
             // Distribute the records from input data stream based on the write.distribution-mode.
             DataStream<RowData> distributeStream = distributeDataStream(
-                    rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
+                    rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType);
 
             // Add parallel writers that append rows to files
-            SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);
+            SingleOutputStreamOperator<WriteResult> writerStream =
+                    appendWriter(distributeStream, flinkRowType, equalityFieldIds);
 
             // Add single-parallelism committer that commits files
             // after successful checkpoint or end of input
@@ -453,6 +463,35 @@ public class FlinkSink {
             return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
         }
 
+        @VisibleForTesting
+        List<Integer> checkAndGetEqualityFieldIds() {
+            List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds());
+            if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
+                Set<Integer> equalityFieldSet =
+                        Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
+                for (String column : equalityFieldColumns) {
+                    org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
+                    Preconditions.checkNotNull(
+                            field,
+                            "Missing required equality field column '%s' in table schema %s",
+                            column,
+                            table.schema());
+                    equalityFieldSet.add(field.fieldId());
+                }
+
+                if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) {
+                    LOG.warn(
+                            "The configured equality field column IDs {} are not matched with the schema identifier "
+                                    + "field IDs {}, use job specified equality field columns as the equality fields "
+                                    + "by default.",
+                            equalityFieldSet,
+                            table.schema().identifierFieldIds());
+                }
+                equalityFieldIds = Lists.newArrayList(equalityFieldSet);
+            }
+            return equalityFieldIds;
+        }
+
         @SuppressWarnings("unchecked")
         private <T> DataStreamSink<T> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
             DataStreamSink<T> resultStream = committerStream
@@ -469,7 +508,10 @@ public class FlinkSink {
         private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
             IcebergProcessOperator<WriteResult, Void> filesCommitter = new IcebergProcessOperator<>(
                     new IcebergSingleFileCommiter(
-                            TableIdentifier.of(table.name()), tableLoader, overwrite, actionProvider));
+                            TableIdentifier.of(table.name()),
+                            tableLoader,
+                            flinkWriteConf.overwriteMode(),
+                            actionProvider));
             SingleOutputStreamOperator<Void> committerStream = writerStream
                     .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
                     .setParallelism(1)
@@ -483,7 +525,8 @@ public class FlinkSink {
         private SingleOutputStreamOperator<Void> appendMultipleCommitter(
                 SingleOutputStreamOperator<MultipleWriteResult> writerStream) {
             IcebergProcessOperator<MultipleWriteResult, Void> multipleFilesCommiter =
-                    new IcebergProcessOperator<>(new IcebergMultipleFilesCommiter(catalogLoader, overwrite));
+                    new IcebergProcessOperator<>(new IcebergMultipleFilesCommiter(catalogLoader,
+                            flinkWriteConf.overwriteMode()));
             SingleOutputStreamOperator<Void> committerStream = writerStream
                     .transform(operatorName(ICEBERG_MULTIPLE_FILES_COMMITTER_NAME), Types.VOID, multipleFilesCommiter)
                     .setParallelism(1)
@@ -494,26 +537,16 @@ public class FlinkSink {
             return committerStream;
         }
 
-        private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
-            // Find out the equality field id list based on the user-provided equality field column names.
-            List<Integer> equalityFieldIds = Lists.newArrayList();
-            if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-                for (String column : equalityFieldColumns) {
-                    org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-                    Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-                            column, table.schema());
-                    equalityFieldIds.add(field.fieldId());
-                }
-            }
-
+        private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType,
+                List<Integer> equalityFieldIds) {
             // Fallback to use upsert mode parsed from table properties if don't specify in job level.
             // Only if not appendMode, upsert can be valid.
-            boolean upsertMode = (upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+            boolean upsertMode = (flinkWriteConf.upsertMode() || PropertyUtil.propertyAsBoolean(table.properties(),
                     UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT)) && !appendMode;
 
             // Validate the equality fields and partition fields if we enable the upsert mode.
             if (upsertMode) {
-                Preconditions.checkState(!overwrite,
+                Preconditions.checkState(!flinkWriteConf.overwriteMode(),
                         "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
                 Preconditions.checkState(!equalityFieldIds.isEmpty(),
                         "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
@@ -527,8 +560,8 @@ public class FlinkSink {
             }
 
             IcebergProcessOperator<RowData, WriteResult> streamWriter = createStreamWriter(
-                    table, flinkRowType, equalityFieldIds, upsertMode, appendMode, inlongMetric,
-                    auditHostAndPorts, dirtyOptions, dirtySink);
+                    table, flinkRowType, equalityFieldIds, flinkWriteConf,
+                    appendMode, inlongMetric, auditHostAndPorts, dirtyOptions, dirtySink);
 
             int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
             SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -570,43 +603,75 @@ public class FlinkSink {
             return writerStream;
         }
 
-        private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
-                Map<String, String> properties,
+        private DataStream<RowData> distributeDataStream(
+                DataStream<RowData> input,
+                List<Integer> equalityFieldIds,
                 PartitionSpec partitionSpec,
                 Schema iSchema,
                 RowType flinkRowType) {
-            DistributionMode writeMode;
-            if (distributionMode == null) {
-                // Fallback to use distribution mode parsed from table properties if don't specify in job level.
-                String modeName = PropertyUtil.propertyAsString(properties,
-                        WRITE_DISTRIBUTION_MODE,
-                        WRITE_DISTRIBUTION_MODE_NONE);
-
-                writeMode = DistributionMode.fromName(modeName);
-            } else {
-                writeMode = distributionMode;
-            }
-
+            DistributionMode writeMode = flinkWriteConf.distributionMode();
+            LOG.info("Write distribution mode is '{}'", writeMode.modeName());
             switch (writeMode) {
                 case NONE:
-                    return input;
+                    if (equalityFieldIds.isEmpty()) {
+                        return input;
+                    } else {
+                        LOG.info("Distribute rows by equality fields, because there are equality fields set");
+                        return input.keyBy(
+                                new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
+                    }
 
                 case HASH:
-                    if (partitionSpec.isUnpartitioned()) {
-                        return input;
+                    if (equalityFieldIds.isEmpty()) {
+                        if (partitionSpec.isUnpartitioned()) {
+                            LOG.warn(
+                                    "Fallback to use 'none' distribution mode, because there are no equality fields "
+                                            + "set and table is unpartitioned");
+                            return input;
+                        } else {
+                            return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+                        }
                     } else {
-                        return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+                        if (partitionSpec.isUnpartitioned()) {
+                            LOG.info(
+                                    "Distribute rows by equality fields, because there are equality fields set "
+                                            + "and table is unpartitioned");
+                            return input.keyBy(
+                                    new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
+                        } else {
+                            for (PartitionField partitionField : partitionSpec.fields()) {
+                                Preconditions.checkState(
+                                        equalityFieldIds.contains(partitionField.sourceId()),
+                                        "In 'hash' distribution mode with equality fields set, partition field '%s' "
+                                                + "should be included in equality fields: '%s'",
+                                        partitionField,
+                                        equalityFieldColumns);
+                            }
+                            return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+                        }
                     }
 
                 case RANGE:
-                    LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
-                            WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
-                    return input;
+                    if (equalityFieldIds.isEmpty()) {
+                        LOG.warn(
+                                "Fallback to use 'none' distribution mode, because there are no equality fields set "
+                                        + "and {}=range is not supported yet in flink",
+                                WRITE_DISTRIBUTION_MODE);
+                        return input;
+                    } else {
+                        LOG.info(
+                                "Distribute rows by equality fields, because there are equality fields set "
+                                        + "and{}=range is not supported yet in flink",
+                                WRITE_DISTRIBUTION_MODE);
+                        return input.keyBy(
+                                new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds));
+                    }
 
                 default:
-                    throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
+                    throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
             }
         }
+
     }
 
     static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
@@ -629,37 +694,28 @@ public class FlinkSink {
     static IcebergProcessOperator<RowData, WriteResult> createStreamWriter(Table table,
             RowType flinkRowType,
             List<Integer> equalityFieldIds,
-            boolean upsert,
+            FlinkWriteConf flinkWriteConf,
             boolean appendMode,
             String inlongMetric,
             String auditHostAndPorts,
             DirtyOptions dirtyOptions,
             @Nullable DirtySink<Object> dirtySink) {
-        // flink A, iceberg a
         Preconditions.checkArgument(table != null, "Iceberg table should't be null");
-        Map<String, String> props = table.properties();
-        long targetFileSize = getTargetFileSizeBytes(props);
-        FileFormat fileFormat = getFileFormat(props);
 
         Table serializableTable = SerializableTable.copyOf(table);
-        TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
-                serializableTable, serializableTable.schema(), flinkRowType, targetFileSize,
-                fileFormat, equalityFieldIds, upsert, appendMode);
-        // Set null for flinkRowType of IcebergSingleStreamWriter
-        // to avoid frequent Field.Getter creation in dirty data sink.
-        return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>(
-                table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts,
-                null, dirtyOptions, dirtySink));
-    }
+        TaskWriterFactory<RowData> taskWriterFactory =
+                new RowDataTaskWriterFactory(
+                        serializableTable,
+                        serializableTable.schema(),
+                        flinkRowType,
+                        flinkWriteConf.targetDataFileSize(),
+                        flinkWriteConf.dataFileFormat(),
+                        equalityFieldIds,
+                        flinkWriteConf.upsertMode(),
+                        appendMode);
 
-    private static FileFormat getFileFormat(Map<String, String> properties) {
-        String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
-        return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+        return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>(
+                table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts, null, dirtyOptions, dirtySink));
     }
 
-    private static long getTargetFileSizeBytes(Map<String, String> properties) {
-        return PropertyUtil.propertyAsLong(properties,
-                WRITE_TARGET_FILE_SIZE_BYTES,
-                WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-    }
 }
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 693c6f5f8..a362ffeee 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -521,6 +521,7 @@
        inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
        inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
        inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java
        inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
        inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
        inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java