You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/09/08 07:59:23 UTC

[inlong] 02/02: [INLONG-5193][Sort] Add dlc small file compact feture and adapt newest auth method (#5243)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 31ea1afd9e729a0664f1b652680208d32680fe78
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Tue Sep 6 19:15:57 2022 +0800

    [INLONG-5193][Sort] Add dlc small file compact feture and adapt newest auth method (#5243)
---
 .../inlong/sort/protocol/constant/DLCConstant.java |   33 +-
 .../protocol/node/load/DLCIcebergLoadNode.java     |   21 +-
 .../protocol/node/load/DLCIcebergLoadNodeTest.java |    5 +-
 .../inlong/sort/base/metric/MetricOption.java      |   89 ++
 .../inlong/sort/base/metric/SinkMetricData.java    |    9 +-
 .../inlong/sort/base/metric/SourceMetricData.java  |   26 +-
 inlong-sort/sort-connectors/iceberg-dlc/pom.xml    |   26 +-
 .../catalog/hybris/DlcWrappedHybrisCatalog.java    |   45 +-
 .../sort/iceberg/flink/CompactTableProperties.java |   98 ++
 .../inlong/sort/iceberg/flink}/FlinkCatalog.java   | 1463 ++++++++++----------
 .../sort/iceberg/flink}/FlinkCatalogFactory.java   |  333 ++---
 .../iceberg/flink}/FlinkDynamicTableFactory.java   |  455 +++---
 .../sort/iceberg/flink}/IcebergTableSink.java      |   47 +-
 .../sort/iceberg/flink/actions/RewriteResult.java  |   32 +
 .../flink/actions/SyncRewriteDataFilesAction.java  |  133 ++
 .../actions/SyncRewriteDataFilesActionOption.java  |  171 +++
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |  122 ++
 .../sort/iceberg/flink}/sink/DeltaManifests.java   |    5 +-
 .../flink}/sink/DeltaManifestsSerializer.java      |    5 +-
 .../iceberg/flink}/sink/FlinkManifestUtil.java     |   12 +-
 .../inlong/sort/iceberg/flink}/sink/FlinkSink.java | 1034 +++++++-------
 .../iceberg/flink}/sink/IcebergFilesCommitter.java |   91 +-
 .../iceberg/flink}/sink/IcebergStreamWriter.java   |   33 +-
 .../flink}/sink/ManifestOutputFileFactory.java     |   13 +-
 .../iceberg/flink}/sink/PartitionKeySelector.java  |    4 +-
 .../iceberg/flink/sink/PartitionedDeltaWriter.java |   91 ++
 .../flink/sink/RowDataTaskWriterFactory.java       |  140 ++
 .../flink/sink/UnpartitionedDeltaWriter.java       |   64 +
 .../org.apache.flink.table.factories.Factory       |   33 +
 .../apache/inlong/sort/iceberg/FlinkCatalog.java   |    2 +
 .../inlong/sort/iceberg/FlinkCatalogFactory.java   |    2 +
 .../sort/iceberg/FlinkDynamicTableFactory.java     |    2 +-
 .../inlong/sort/iceberg/IcebergTableSink.java      |    9 +
 .../sort/iceberg/sink/BaseDeltaTaskWriter.java     |  122 ++
 .../inlong/sort/iceberg/sink/DeltaManifests.java   |    3 +
 .../iceberg/sink/DeltaManifestsSerializer.java     |    3 +
 .../sort/iceberg/sink/FlinkManifestUtil.java       |    3 +
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |   28 +-
 .../sort/iceberg/sink/IcebergFilesCommitter.java   |    3 +
 .../sort/iceberg/sink/IcebergStreamWriter.java     |    3 +
 .../iceberg/sink/ManifestOutputFileFactory.java    |    3 +
 .../sort/iceberg/sink/PartitionKeySelector.java    |    2 +
 .../sort/iceberg/sink/PartitionedDeltaWriter.java  |   91 ++
 .../iceberg/sink/RowDataTaskWriterFactory.java     |  140 ++
 .../iceberg/sink/UnpartitionedDeltaWriter.java     |   64 +
 inlong-sort/sort-core/pom.xml                      |   10 +-
 .../inlong/sort/parser/DLCIcebergSqlParseTest.java |   10 +-
 licenses/inlong-manager/LICENSE                    |   10 +-
 licenses/inlong-sort-connectors/LICENSE            |   53 +-
 pom.xml                                            |   19 +-
 50 files changed, 3437 insertions(+), 1778 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java
index 1788cd5ba..e4ef7d310 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java
@@ -27,36 +27,45 @@ public class DLCConstant {
      * DLC internet access domain name.
      */
     public static final String DLC_ENDPOINT = "dlc.tencentcloudapi.com";
-
+    // ============================== DLC AUTH PARAMS(Required) =====================================
     /**
      * dlc account region
      */
-    public static final String DLC_REGION  = "qcloud.dlc.region";
+    public static final String DLC_REGION = "qcloud.dlc.region";
     /**
      * dlc account secret id
      */
-    public static final String DLC_SECRET_ID  = "qcloud.dlc.secret-id";
+    public static final String DLC_SECRET_ID = "qcloud.dlc.secret-id";
     /**
      * dlc account secret key
      */
-    public static final String DLC_SECRET_KEY  = "qcloud.dlc.secret-key";
-
+    public static final String DLC_SECRET_KEY = "qcloud.dlc.secret-key";
     /**
-     * dlc cos region
+     * Current user appid.
      */
-    public static final String FS_COS_REGION  = "fs.cosn.userinfo.region";
+    public static final String DLC_USER_APPID = "qcloud.dlc.user.appid";
     /**
-     * dlc main account cos secret id
+     * Managed account uid.
      */
-    public static final String FS_COS_SECRET_ID  = "fs.cosn.userinfo.secretId";
+    public static final String DLC_MANAGED_ACCOUNT_UID = "qcloud.dlc.managed.account.uid";
     /**
-     * dlc main account cos secret key
+     * dlc jdbc url(optional)
      */
-    public static final String FS_COS_SECRET_KEY  = "fs.cosn.userinfo.secretKey";
+    public static final String DLC_JDBC_URL = "qcloud.dlc.jdbc.url";
 
+    // ============================== FS CREDENTIALS AUTH PARAMS =====================================
     public static final String FS_LAKEFS_IMPL  = "fs.lakefs.impl";
     public static final String FS_COS_IMPL  = "fs.cosn.impl";
     public static final String FS_COS_AUTH_PROVIDER  = "fs.cosn.credentials.provider";
+    public static final String FS_COS_REGION  = "fs.cosn.userinfo.region";
+    public static final String FS_COS_SECRET_ID  = "fs.cosn.userinfo.secretId";
+    public static final String FS_COS_SECRET_KEY  = "fs.cosn.userinfo.secretKey";
+
+    public static final String FS_AUTH_DLC_SECRET_ID = "service.secret.id";
+    public static final String FS_AUTH_DLC_SECRET_KEY = "service.secret.key";
+    public static final String FS_AUTH_DLC_REGION  = "service.region";
+    public static final String FS_AUTH_DLC_ACCOUNT_APPID  = "user.appid";
+    public static final String FS_AUTH_DLC_MANAGED_ACCOUNT_UID  = "request.identity.token";
 
     public static final String DLC_CATALOG_IMPL_CLASS =
             "org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog";
@@ -65,7 +74,7 @@ public class DLCConstant {
                 {
                     put(FS_LAKEFS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
                     put(FS_COS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
-                    put(FS_COS_AUTH_PROVIDER, "org.apache.hadoop.fs.auth.SimpleCredentialProvider");
+                    put(FS_COS_AUTH_PROVIDER, "org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider");
                 }
             });
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
index 5e0f1dc67..dcb55426a 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
@@ -91,9 +91,7 @@ public class DLCIcebergLoadNode extends LoadNode implements InlongMetric, Serial
     @Override
     public Map<String, String> tableOptions() {
         Map<String, String> options = super.tableOptions();
-        options.put("connector", "iceberg-inlong");
-        // for test sink.ignore.changelog
-        // options.put("sink.ignore.changelog", "true");
+        options.put("connector", "dlc-inlong");
         options.put("catalog-database", dbName);
         options.put("catalog-table", tableName);
         options.put("default-database", dbName);
@@ -106,6 +104,16 @@ public class DLCIcebergLoadNode extends LoadNode implements InlongMetric, Serial
             options.put("warehouse", warehouse);
         }
         options.putAll(DLCConstant.DLC_DEFAULT_IMPL);
+        // for filesystem auth
+        options.put(DLCConstant.FS_COS_REGION, options.get(DLCConstant.DLC_REGION));
+        options.put(DLCConstant.FS_COS_SECRET_ID, options.get(DLCConstant.DLC_SECRET_ID));
+        options.put(DLCConstant.FS_COS_SECRET_KEY, options.get(DLCConstant.DLC_SECRET_KEY));
+
+        options.put(DLCConstant.FS_AUTH_DLC_SECRET_ID, options.get(DLCConstant.DLC_SECRET_ID));
+        options.put(DLCConstant.FS_AUTH_DLC_SECRET_KEY, options.get(DLCConstant.DLC_SECRET_KEY));
+        options.put(DLCConstant.FS_AUTH_DLC_REGION, options.get(DLCConstant.DLC_REGION));
+        options.put(DLCConstant.FS_AUTH_DLC_ACCOUNT_APPID, options.get(DLCConstant.DLC_USER_APPID));
+        options.put(DLCConstant.FS_AUTH_DLC_MANAGED_ACCOUNT_UID, options.get(DLCConstant.DLC_MANAGED_ACCOUNT_UID));
         return options;
     }
 
@@ -129,9 +137,8 @@ public class DLCIcebergLoadNode extends LoadNode implements InlongMetric, Serial
         Preconditions.checkNotNull(properties.get(DLCConstant.DLC_SECRET_ID), "dlc secret-id is null");
         Preconditions.checkNotNull(properties.get(DLCConstant.DLC_SECRET_KEY), "dlc secret-key is null");
         Preconditions.checkNotNull(properties.get(DLCConstant.DLC_REGION), "dlc region is null");
-
-        Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_REGION), "cos region is null");
-        Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_SECRET_ID), "cos secret-id is null");
-        Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_SECRET_KEY), "cos secret-key is null");
+        Preconditions.checkNotNull(properties.get(DLCConstant.DLC_USER_APPID), "dlc user appid is null");
+        Preconditions.checkNotNull(
+                properties.get(DLCConstant.DLC_MANAGED_ACCOUNT_UID), "dlc managed account appid is null");
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java
index 0f690b75a..a12ab5f0a 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java
@@ -38,10 +38,9 @@ public class DLCIcebergLoadNodeTest extends SerializeBaseTest<DLCIcebergLoadNode
         properties.put(DLCConstant.DLC_REGION, "ap-beijing");
         properties.put(DLCConstant.DLC_SECRET_ID, "XXXXXXXXXXX");
         properties.put(DLCConstant.DLC_SECRET_KEY, "XXXXXXXXXXX");
+        properties.put(DLCConstant.DLC_USER_APPID, "XXXXXXXXXXX");
+        properties.put(DLCConstant.DLC_MANAGED_ACCOUNT_UID, "XXXXXXXXXXX");
 
-        properties.put(DLCConstant.FS_COS_REGION, "ap-beijing");
-        properties.put(DLCConstant.FS_COS_SECRET_ID, "XXXXXXXXXXX");
-        properties.put(DLCConstant.FS_COS_SECRET_KEY, "XXXXXXXXXXX");
         return new DLCIcebergLoadNode(
                 "iceberg_dlc",
                 "iceberg_dlc_output",
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
new file mode 100644
index 000000000..d2179ae54
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -0,0 +1,89 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
+
+import javax.annotation.Nullable;
+import java.util.HashSet;
+import java.util.regex.Pattern;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
+public class MetricOption {
+    private static final String IP_OR_HOST_PORT = "^(.*):([0-9]|[1-9]\\d|[1-9]\\d{"
+            + "2}|[1-9]\\d{"
+            + "3}|[1-5]\\d{"
+            + "4}|6[0-4]\\d{"
+            + "3}|65[0-4]\\d{"
+            + "2}|655[0-2]\\d|6553[0-5])$";
+
+    private final String groupId;
+    private final String streamId;
+    private final String nodeId;
+    private final HashSet<String> ipPortList;
+    private String ipPorts;
+
+    public MetricOption(String inLongMetric) {
+        this(inLongMetric, null);
+    }
+
+    public MetricOption(String inLongMetric, @Nullable String inLongAudit) {
+        ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inLongMetric, inLongAudit);
+        String[] inLongMetricArray = inLongMetric.split(DELIMITER);
+        Preconditions.checkArgument(inLongMetricArray.length == 3,
+                "Error inLong metric format: " + inLongMetric);
+        this.groupId = inLongMetricArray[0];
+        this.streamId = inLongMetricArray[1];
+        this.nodeId = inLongMetricArray[2];
+        this.ipPortList = new HashSet<>();
+        this.ipPorts = null;
+
+        if (inLongAudit != null) {
+            String[] ipPortStrs = inLongAudit.split(DELIMITER);
+            this.ipPorts = inLongAudit;
+            for (String ipPort : ipPortStrs) {
+                Preconditions.checkArgument(Pattern.matches(IP_OR_HOST_PORT, ipPort),
+                        "Error inLong audit format: " + inLongAudit);
+                this.ipPortList.add(ipPort);
+            }
+        }
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public HashSet<String> getIpPortList() {
+        return ipPortList;
+    }
+
+    public String getIpPorts() {
+        return ipPorts;
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 0daff99a9..67b47657e 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -56,10 +56,11 @@ public class SinkMetricData implements MetricData {
     private Meter numBytesOutPerSecond;
 
     public SinkMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
-        this.metricGroup = metricGroup;
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.nodeId = nodeId;
+        this(groupId, streamId, nodeId, metricGroup, null);
+    }
+
+    public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
+        this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
     }
 
     public SinkMetricData(
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index bd82cc041..d97efc9f5 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -25,6 +25,12 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.inlong.audit.AuditImp;
 import org.apache.inlong.sort.base.Constants;
 
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
@@ -46,7 +52,11 @@ public class SourceMetricData implements MetricData {
     private final AuditImp auditImp;
 
     public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
-        this(groupId, streamId, nodeId, metricGroup, null);
+        this(groupId, streamId, nodeId, metricGroup, (AuditImp) null);
+    }
+
+    public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
+        this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
     }
 
     public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup,
@@ -58,6 +68,20 @@ public class SourceMetricData implements MetricData {
         this.auditImp = auditImp;
     }
 
+    public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup,
+            @Nullable String auditHostAndPorts) {
+        this.groupId = groupId;
+        this.streamId = streamId;
+        this.nodeId = nodeId;
+        this.metricGroup = metricGroup;
+        if (auditHostAndPorts != null) {
+            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+            this.auditImp = AuditImp.getInstance();
+        } else {
+            this.auditImp = null;
+        }
+    }
+
     /**
      * Default counter is {@link SimpleCounter}
      * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/pom.xml b/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
index be760589e..86d0752c3 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
+++ b/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
@@ -36,6 +36,20 @@
     </description>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.qcloud.cos</groupId>
+            <artifactId>lakefs-cloud-plugin</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.qcloud</groupId>
+            <artifactId>dlc-jdbc</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>com.qcloud.cos</groupId>
             <artifactId>hadoop-cos</artifactId>
@@ -46,11 +60,12 @@
         </dependency>
         <dependency>
             <groupId>com.tencentcloudapi</groupId>
-            <artifactId>tencentcloud-sdk-java</artifactId>
+            <artifactId>tencentcloud-sdk-java</artifactId><!--TODO:replaced by 558-SNAPSHOT with dlc special treated-->
         </dependency>
         <dependency>
             <groupId>com.qcloud</groupId>
             <artifactId>dlc-data-catalog-metastore-client</artifactId>
+            <version>1.1.1</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.hive</groupId>
@@ -86,11 +101,9 @@
             <groupId>org.ini4j</groupId>
             <artifactId>ini4j</artifactId>
         </dependency>
-
         <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-connector-iceberg</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-flink-runtime-1.13</artifactId>
         </dependency>
 
         <dependency>
@@ -141,12 +154,15 @@
                                     <include>com.qcloud.cos:*</include>
                                     <include>com.qcloud:*</include>
                                     <include>com.tencentcloudapi:*</include>
+                                    <include>com.tencent.cloud.dlc:*</include>
                                     <include>commons-logging:commons-logging</include>
+                                    <include>commons-codec:commons-codec</include>
                                     <include>com.squareup.okio:okio</include>
                                     <include>com.squareup.okhttp:okhttp</include>
                                     <include>com.squareup.okhttp:logging-interceptor</include>
                                     <include>org.ini4j:ini4j</include>
                                     <include>com.google.code.gson:gson</include>
+                                    <include>javax.xml.bind:jaxb-api</include>
                                 </includes>
                             </artifactSet>
                             <filters>
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java
index a8f64db52..c303e21e2 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java
@@ -23,11 +23,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import com.qcloud.dlc.common.Constants;
 import com.qcloud.dlc.metastore.DLCDataCatalogMetastoreClient;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CosNConfigKeys;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -71,10 +75,25 @@ import org.slf4j.LoggerFactory;
 public class DlcWrappedHybrisCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
     public static final String LIST_ALL_TABLES = "list-all-tables";
     public static final String LIST_ALL_TABLES_DEFAULT = "false";
-
-    // dlc auth
-    public static final String DLC_ENDPOINT = "qcloud.dlc.endpoint";
-
+    public static final Set<String> DLC_WHITELIST_PARAMS = Stream.of(
+            Constants.DLC_REGION_CONF,
+            Constants.DLC_ENDPOINT,
+            Constants.DLC_REGION_CONF,
+            Constants.DLC_SECRET_ID_CONF,
+            Constants.DLC_SECRET_KEY_CONF,
+            DlcCloudCredentialsProvider.END_POINT,
+            DlcCloudCredentialsProvider.SECRET_ID,
+            DlcCloudCredentialsProvider.SECRET_KEY,
+            DlcCloudCredentialsProvider.REGION,
+            DlcCloudCredentialsProvider.USER_APPID,
+            DlcCloudCredentialsProvider.REQUEST_IDENTITY_TOKEN,
+            CosNConfigKeys.COSN_USERINFO_SECRET_ID_KEY,
+            CosNConfigKeys.COSN_USERINFO_SECRET_KEY_KEY,
+            CosNConfigKeys.COSN_REGION_PREV_KEY,
+            CosNConfigKeys.COSN_CREDENTIALS_PROVIDER,
+            "fs.lakefs.impl",
+            "fs.cosn.impl"
+        ).collect(Collectors.toSet());
 
     private static final Logger LOG = LoggerFactory.getLogger(DlcWrappedHybrisCatalog.class);
 
@@ -95,8 +114,14 @@ public class DlcWrappedHybrisCatalog extends BaseMetastoreCatalog implements Sup
             this.conf = new Configuration();
         }
 
+        // dlc auth
+        properties.entrySet().stream()
+                .filter(entry -> DLC_WHITELIST_PARAMS.contains(entry.getKey()))
+                .forEach(entry -> this.conf.set(entry.getKey(), entry.getValue()));
+
         if (properties.containsKey(CatalogProperties.URI)) {
-            this.conf.set(DLC_ENDPOINT, properties.get(CatalogProperties.URI));
+            this.conf.set(Constants.DLC_ENDPOINT, properties.get(CatalogProperties.URI));
+            this.conf.set(DlcCloudCredentialsProvider.END_POINT, properties.get(CatalogProperties.URI));
         }
 
         if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
@@ -104,16 +129,6 @@ public class DlcWrappedHybrisCatalog extends BaseMetastoreCatalog implements Sup
                     properties.get(CatalogProperties.WAREHOUSE_LOCATION));
         }
 
-        // dlc auth
-        properties.entrySet().stream()
-                .filter(entry -> entry.getKey().startsWith("qcloud.dlc"))
-                .forEach(entry -> this.conf.set(entry.getKey(), entry.getValue()));
-
-        // lakefs auth
-        properties.entrySet().stream()
-                .filter(entry -> entry.getKey().startsWith("fs"))
-                .forEach(entry -> this.conf.set(entry.getKey(), entry.getValue()));
-
         this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));
 
         String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/CompactTableProperties.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/CompactTableProperties.java
new file mode 100644
index 000000000..08efdc00d
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/CompactTableProperties.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class CompactTableProperties {
+    public static final String COMPACT_PREFIX = "write.compact.";
+
+    public static final String COMPACT_ENABLED = "write.compact.enable";
+    public static final boolean COMPACT_ENABLED_DEFAULT = false;
+
+    public static final String COMPACT_INTERVAL = "write.compact.snapshot.interval";
+    public static final int COMPACT_INTERVAL_DEFAULT = 5;
+
+    public static final String COMPACT_RESOUCE_POOL = "write.compact.resource.name";
+    public static final String COMPACT_RESOUCE_POOL_DEFAULT = "default";
+
+    // Supported by spark rewrite action option
+    public static final String COMPACT_MAX_CONCURRENT_FILE_GROUP_REWRITES
+            = "write.compact.max-concurrent-file-group-rewrites";
+    public static final int COMPACT_MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT = 1;
+
+    public static final String COMPACT_MAX_FILE_GROUP_SIZE_BYTES = "write.compact.max-file-group-size-bytes";
+    public static final long COMPACT_MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+    public static final String COMPACT_PARTIAL_PROGRESS_ENABLED = "write.compact.partial-progress.enabled";
+    public static final boolean COMPACT_PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+    public static final String COMPACT_PARTIAL_PROGRESS_MAX_COMMITS = "write.compact.partial-progress.max-commits";
+    public static final int COMPACT_PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+    public static final String COMPACT_TARGET_FILE_SIZE_BYTES = "write.compact.target-file-size-bytes";
+    public static final int COMPACT_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024; // 512 MB
+
+    public static final String COMPACT_USE_STARTING_SEQUENCE_NUMBER = "write.compact.use-starting-sequence-number";
+    public static final boolean COMPACT_USE_STARTING_SEQUENCE_NUMBER_DEFAULT = true;
+
+    public static final String COMPACT_MIN_INPUT_FILES = "write.compact.min-input-files";
+    public static final int COMPACT_MIN_INPUT_FILES_DEFAULT = 5;
+
+    public static final String COMPACT_DELETE_FILE_THRESHOLD = "write.compact.delete-file-threshold";
+    public static final int COMPACT_DELETE_FILE_THRESHOLD_DEFAULT = Integer.MAX_VALUE;
+
+    public static final String COMPACT_MIN_FILE_SIZE_BYTES = "write.compact.min-file-size-bytes";
+    public static final double COMPACT_MIN_FILE_SIZE_BYTES_DEFAULT = 0.75d * COMPACT_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+    public static final String COMPACT_MAX_FILE_SIZE_BYTES = "write.compact.max-file-size-bytes";
+    public static final double COMPACT_MAX_FILE_SIZE_BYTES_DEFAULT = 1.80d * COMPACT_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+    public static final Set<String> TABLE_AUTO_COMPACT_PROPERTIES = Stream.of(
+                COMPACT_ENABLED,
+                COMPACT_INTERVAL,
+                COMPACT_RESOUCE_POOL,
+                COMPACT_MAX_CONCURRENT_FILE_GROUP_REWRITES,
+                COMPACT_MAX_FILE_GROUP_SIZE_BYTES,
+                COMPACT_PARTIAL_PROGRESS_ENABLED,
+                COMPACT_PARTIAL_PROGRESS_MAX_COMMITS,
+                COMPACT_TARGET_FILE_SIZE_BYTES,
+                COMPACT_USE_STARTING_SEQUENCE_NUMBER,
+                COMPACT_MIN_INPUT_FILES,
+                COMPACT_DELETE_FILE_THRESHOLD,
+                COMPACT_MIN_FILE_SIZE_BYTES,
+                COMPACT_MAX_FILE_SIZE_BYTES
+        ).collect(Collectors.toSet());
+
+    public static final Set<String> ACTION_AUTO_COMPACT_OPTIONS = Stream.of(
+            COMPACT_MAX_CONCURRENT_FILE_GROUP_REWRITES,
+            COMPACT_MAX_FILE_GROUP_SIZE_BYTES,
+            COMPACT_PARTIAL_PROGRESS_ENABLED,
+            COMPACT_PARTIAL_PROGRESS_MAX_COMMITS,
+            COMPACT_TARGET_FILE_SIZE_BYTES,
+            COMPACT_USE_STARTING_SEQUENCE_NUMBER,
+            COMPACT_MIN_INPUT_FILES,
+            COMPACT_DELETE_FILE_THRESHOLD,
+            COMPACT_MIN_FILE_SIZE_BYTES,
+            COMPACT_MAX_FILE_SIZE_BYTES
+    ).collect(Collectors.toSet());
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalog.java
similarity index 95%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalog.java
index 2056a8d02..695bf2ea6 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalog.java
@@ -1,731 +1,732 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.sort.iceberg;
-
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.AbstractCatalog;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
-import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.factories.Factory;
-import org.apache.flink.util.StringUtils;
-import org.apache.iceberg.CachingCatalog;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.Transaction;
-import org.apache.iceberg.UpdateProperties;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.SupportsNamespaces;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.exceptions.AlreadyExistsException;
-import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
-import org.apache.iceberg.exceptions.NoSuchNamespaceException;
-import org.apache.iceberg.flink.CatalogLoader;
-import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}.
- * <p>
- * The mapping between Flink database and Iceberg namespace:
- * Supplying a base namespace for a given catalog, so if you have a catalog that supports a 2-level namespace, you
- * would supply the first level in the catalog configuration and the second level would be exposed as Flink databases.
- * </p>
- * The Iceberg table manages its partitions by itself. The partition of the Iceberg table is independent of the
- * partition of Flink.
- */
-public class FlinkCatalog extends AbstractCatalog {
-
-    private final CatalogLoader catalogLoader;
-    private final Catalog icebergCatalog;
-    private final Namespace baseNamespace;
-    private final SupportsNamespaces asNamespaceCatalog;
-    private final Closeable closeable;
-    private final boolean cacheEnabled;
-
-    public FlinkCatalog(
-            String catalogName,
-            String defaultDatabase,
-            Namespace baseNamespace,
-            CatalogLoader catalogLoader,
-            boolean cacheEnabled) {
-        super(catalogName, defaultDatabase);
-        this.catalogLoader = catalogLoader;
-        this.baseNamespace = baseNamespace;
-        this.cacheEnabled = cacheEnabled;
-
-        Catalog originalCatalog = catalogLoader.loadCatalog();
-        icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
-        asNamespaceCatalog =
-                originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
-        closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
-    }
-
-    private static void validateFlinkTable(CatalogBaseTable table) {
-        Preconditions.checkArgument(table instanceof CatalogTable, "The Table should be a CatalogTable.");
-
-        TableSchema schema = table.getSchema();
-        schema.getTableColumns().forEach(column -> {
-            if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
-                throw new UnsupportedOperationException("Creating table with computed columns is not supported yet.");
-            }
-        });
-
-        if (!schema.getWatermarkSpecs().isEmpty()) {
-            throw new UnsupportedOperationException("Creating table with watermark specs is not supported yet.");
-        }
-    }
-
-    private static PartitionSpec toPartitionSpec(List<String> partitionKeys, Schema icebergSchema) {
-        PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
-        partitionKeys.forEach(builder::identity);
-        return builder.build();
-    }
-
-    private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSchema) {
-        List<String> partitionKeys = Lists.newArrayList();
-        for (PartitionField field : spec.fields()) {
-            if (field.transform().isIdentity()) {
-                partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
-            } else {
-                // Not created by Flink SQL.
-                // For compatibility with iceberg tables, return empty.
-                // TODO modify this after Flink support partition transform.
-                return Collections.emptyList();
-            }
-        }
-        return partitionKeys;
-    }
-
-    private static void commitChanges(Table table, String setLocation, String setSnapshotId,
-            String pickSnapshotId, Map<String, String> setProperties) {
-        // don't allow setting the snapshot and picking a commit
-        // at the same time because order is ambiguous and choosing
-        // one order leads to different results
-        Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId == null,
-                "Cannot set the current snapshot ID and cherry-pick snapshot changes");
-
-        if (setSnapshotId != null) {
-            long newSnapshotId = Long.parseLong(setSnapshotId);
-            table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
-        }
-
-        // if updating the table snapshot, perform that update first in case it fails
-        if (pickSnapshotId != null) {
-            long newSnapshotId = Long.parseLong(pickSnapshotId);
-            table.manageSnapshots().cherrypick(newSnapshotId).commit();
-        }
-
-        Transaction transaction = table.newTransaction();
-
-        if (setLocation != null) {
-            transaction.updateLocation()
-                    .setLocation(setLocation)
-                    .commit();
-        }
-
-        if (!setProperties.isEmpty()) {
-            UpdateProperties updateProperties = transaction.updateProperties();
-            setProperties.forEach((k, v) -> {
-                if (v == null) {
-                    updateProperties.remove(k);
-                } else {
-                    updateProperties.set(k, v);
-                }
-            });
-            updateProperties.commit();
-        }
-
-        transaction.commitTransaction();
-    }
-
-    static CatalogTable toCatalogTable(Table table) {
-        TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
-        List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
-
-        // NOTE: We can not create a IcebergCatalogTable extends CatalogTable, because Flink optimizer may use
-        // CatalogTableImpl to copy a new catalog table.
-        // Let's re-loading table from Iceberg catalog when creating source/sink operators.
-        // Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
-        return new CatalogTableImpl(schema, partitionKeys, table.properties(), null);
-    }
-
-    @Override
-    public void open() throws CatalogException {
-        // Create the default database if it does not exist.
-        try {
-            createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
-        } catch (DatabaseAlreadyExistException e) {
-            // Ignore the exception if it's already exist.
-        }
-    }
-
-    @Override
-    public void close() throws CatalogException {
-        if (closeable != null) {
-            try {
-                closeable.close();
-            } catch (IOException e) {
-                throw new CatalogException(e);
-            }
-        }
-    }
-
-    public Catalog catalog() {
-        return icebergCatalog;
-    }
-
-    private Namespace toNamespace(String database) {
-        String[] namespace = new String[baseNamespace.levels().length + 1];
-        System.arraycopy(baseNamespace.levels(), 0, namespace, 0, baseNamespace.levels().length);
-        namespace[baseNamespace.levels().length] = database;
-        return Namespace.of(namespace);
-    }
-
-    TableIdentifier toIdentifier(ObjectPath path) {
-        return TableIdentifier.of(toNamespace(path.getDatabaseName()), path.getObjectName());
-    }
-
-    @Override
-    public List<String> listDatabases() throws CatalogException {
-        if (asNamespaceCatalog == null) {
-            return Collections.singletonList(getDefaultDatabase());
-        }
-
-        return asNamespaceCatalog.listNamespaces(baseNamespace).stream()
-                .map(n -> n.level(n.levels().length - 1))
-                .collect(Collectors.toList());
-    }
-
-    @Override
-    public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
-        if (asNamespaceCatalog == null) {
-            if (!getDefaultDatabase().equals(databaseName)) {
-                throw new DatabaseNotExistException(getName(), databaseName);
-            } else {
-                return new CatalogDatabaseImpl(Maps.newHashMap(), "");
-            }
-        } else {
-            try {
-                Map<String, String> metadata =
-                        Maps.newHashMap(asNamespaceCatalog.loadNamespaceMetadata(toNamespace(databaseName)));
-                String comment = metadata.remove("comment");
-                return new CatalogDatabaseImpl(metadata, comment);
-            } catch (NoSuchNamespaceException e) {
-                throw new DatabaseNotExistException(getName(), databaseName, e);
-            }
-        }
-    }
-
-    @Override
-    public boolean databaseExists(String databaseName) throws CatalogException {
-        try {
-            getDatabase(databaseName);
-            return true;
-        } catch (DatabaseNotExistException ignore) {
-            return false;
-        }
-    }
-
-    @Override
-    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
-            throws DatabaseAlreadyExistException, CatalogException {
-        createDatabase(name, mergeComment(database.getProperties(), database.getComment()), ignoreIfExists);
-    }
-
-    private void createDatabase(String databaseName, Map<String, String> metadata, boolean ignoreIfExists)
-            throws DatabaseAlreadyExistException, CatalogException {
-        if (asNamespaceCatalog != null) {
-            try {
-                asNamespaceCatalog.createNamespace(toNamespace(databaseName), metadata);
-            } catch (AlreadyExistsException e) {
-                if (!ignoreIfExists) {
-                    throw new DatabaseAlreadyExistException(getName(), databaseName, e);
-                }
-            }
-        } else {
-            throw new UnsupportedOperationException("Namespaces are not supported by catalog: " + getName());
-        }
-    }
-
-    private Map<String, String> mergeComment(Map<String, String> metadata, String comment) {
-        Map<String, String> ret = Maps.newHashMap(metadata);
-        if (metadata.containsKey("comment")) {
-            throw new CatalogException("Database properties should not contain key: 'comment'.");
-        }
-
-        if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
-            ret.put("comment", comment);
-        }
-        return ret;
-    }
-
-    @Override
-    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
-            throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
-        if (asNamespaceCatalog != null) {
-            try {
-                boolean success = asNamespaceCatalog.dropNamespace(toNamespace(name));
-                if (!success && !ignoreIfNotExists) {
-                    throw new DatabaseNotExistException(getName(), name);
-                }
-            } catch (NoSuchNamespaceException e) {
-                if (!ignoreIfNotExists) {
-                    throw new DatabaseNotExistException(getName(), name, e);
-                }
-            } catch (NamespaceNotEmptyException e) {
-                throw new DatabaseNotEmptyException(getName(), name, e);
-            }
-        } else {
-            if (!ignoreIfNotExists) {
-                throw new DatabaseNotExistException(getName(), name);
-            }
-        }
-    }
-
-    @Override
-    public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
-            throws DatabaseNotExistException, CatalogException {
-        if (asNamespaceCatalog != null) {
-            Namespace namespace = toNamespace(name);
-            Map<String, String> updates = Maps.newHashMap();
-            Set<String> removals = Sets.newHashSet();
-
-            try {
-                Map<String, String> oldProperties = asNamespaceCatalog.loadNamespaceMetadata(namespace);
-                Map<String, String> newProperties = mergeComment(newDatabase.getProperties(), newDatabase.getComment());
-
-                for (String key : oldProperties.keySet()) {
-                    if (!newProperties.containsKey(key)) {
-                        removals.add(key);
-                    }
-                }
-
-                for (Map.Entry<String, String> entry : newProperties.entrySet()) {
-                    if (!entry.getValue().equals(oldProperties.get(entry.getKey()))) {
-                        updates.put(entry.getKey(), entry.getValue());
-                    }
-                }
-
-                if (!updates.isEmpty()) {
-                    asNamespaceCatalog.setProperties(namespace, updates);
-                }
-
-                if (!removals.isEmpty()) {
-                    asNamespaceCatalog.removeProperties(namespace, removals);
-                }
-
-            } catch (NoSuchNamespaceException e) {
-                if (!ignoreIfNotExists) {
-                    throw new DatabaseNotExistException(getName(), name, e);
-                }
-            }
-        } else {
-            if (getDefaultDatabase().equals(name)) {
-                throw new CatalogException(
-                        "Can not alter the default database when the iceberg catalog doesn't support namespaces.");
-            }
-            if (!ignoreIfNotExists) {
-                throw new DatabaseNotExistException(getName(), name);
-            }
-        }
-    }
-
-    @Override
-    public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
-        try {
-            return icebergCatalog.listTables(toNamespace(databaseName)).stream()
-                    .map(TableIdentifier::name)
-                    .collect(Collectors.toList());
-        } catch (NoSuchNamespaceException e) {
-            throw new DatabaseNotExistException(getName(), databaseName, e);
-        }
-    }
-
-    @Override
-    public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
-        Table table = loadIcebergTable(tablePath);
-        return toCatalogTable(table);
-    }
-
-    private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
-        try {
-            Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
-            if (cacheEnabled) {
-                table.refresh();
-            }
-
-            return table;
-        } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
-            throw new TableNotExistException(getName(), tablePath, e);
-        }
-    }
-
-    @Override
-    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
-        return icebergCatalog.tableExists(toIdentifier(tablePath));
-    }
-
-    @Override
-    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-            throws TableNotExistException, CatalogException {
-        try {
-            icebergCatalog.dropTable(toIdentifier(tablePath));
-        } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
-            if (!ignoreIfNotExists) {
-                throw new TableNotExistException(getName(), tablePath, e);
-            }
-        }
-    }
-
-    @Override
-    public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
-            throws TableNotExistException, TableAlreadyExistException, CatalogException {
-        try {
-            icebergCatalog.renameTable(
-                    toIdentifier(tablePath),
-                    toIdentifier(new ObjectPath(tablePath.getDatabaseName(), newTableName)));
-        } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
-            if (!ignoreIfNotExists) {
-                throw new TableNotExistException(getName(), tablePath, e);
-            }
-        } catch (AlreadyExistsException e) {
-            throw new TableAlreadyExistException(getName(), tablePath, e);
-        }
-    }
-
-    @Override
-    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
-            throws CatalogException, TableAlreadyExistException {
-        if (Objects.equals(table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
-            throw new IllegalArgumentException("Cannot create the table with 'connector'='iceberg' table property in "
-                    + "an iceberg catalog, Please create table with 'connector'='iceberg' "
-                    + "property in a non-iceberg catalog "
-                    + "or create table without 'connector'='iceberg' related properties in an iceberg table.");
-        }
-
-        createIcebergTable(tablePath, table, ignoreIfExists);
-    }
-
-    void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
-            throws CatalogException, TableAlreadyExistException {
-        validateFlinkTable(table);
-
-        Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
-        PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
-
-        ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
-        String location = null;
-        for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
-            if ("location".equalsIgnoreCase(entry.getKey())) {
-                location = entry.getValue();
-            } else {
-                properties.put(entry.getKey(), entry.getValue());
-            }
-        }
-
-        try {
-            icebergCatalog.createTable(
-                    toIdentifier(tablePath),
-                    icebergSchema,
-                    spec,
-                    location,
-                    properties.build());
-        } catch (AlreadyExistsException e) {
-            if (!ignoreIfExists) {
-                throw new TableAlreadyExistException(getName(), tablePath, e);
-            }
-        }
-    }
-
-    @Override
-    public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
-            throws CatalogException, TableNotExistException {
-        validateFlinkTable(newTable);
-
-        Table icebergTable;
-        try {
-            icebergTable = loadIcebergTable(tablePath);
-        } catch (TableNotExistException e) {
-            if (!ignoreIfNotExists) {
-                throw e;
-            } else {
-                return;
-            }
-        }
-
-        CatalogTable table = toCatalogTable(icebergTable);
-
-        // Currently, Flink SQL only support altering table properties.
-
-        // For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by comparing
-        // CatalogTable instances, unless the Flink schema contains Iceberg column IDs.
-        if (!table.getSchema().equals(newTable.getSchema())) {
-            throw new UnsupportedOperationException("Altering schema is not supported yet.");
-        }
-
-        if (!table.getPartitionKeys().equals(((CatalogTable) newTable).getPartitionKeys())) {
-            throw new UnsupportedOperationException("Altering partition keys is not supported yet.");
-        }
-
-        Map<String, String> oldProperties = table.getOptions();
-        Map<String, String> setProperties = Maps.newHashMap();
-
-        String setLocation = null;
-        String setSnapshotId = null;
-        String pickSnapshotId = null;
-
-        for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
-            String key = entry.getKey();
-            String value = entry.getValue();
-
-            if (Objects.equals(value, oldProperties.get(key))) {
-                continue;
-            }
-
-            if ("location".equalsIgnoreCase(key)) {
-                setLocation = value;
-            } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
-                setSnapshotId = value;
-            } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
-                pickSnapshotId = value;
-            } else {
-                setProperties.put(key, value);
-            }
-        }
-
-        oldProperties.keySet().forEach(k -> {
-            if (!newTable.getOptions().containsKey(k)) {
-                setProperties.put(k, null);
-            }
-        });
-
-        commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
-    }
-
-    @Override
-    public Optional<Factory> getFactory() {
-        return Optional.of(new FlinkDynamicTableFactory(this));
-    }
-
-    CatalogLoader getCatalogLoader() {
-        return catalogLoader;
-    }
-
-    // ------------------------------ Unsupported methods ---------------------------------------------
-
-    @Override
-    public List<String> listViews(String databaseName) throws CatalogException {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-            throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition,
-            boolean ignoreIfExists) throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
-            throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition,
-            boolean ignoreIfNotExists) throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<String> listFunctions(String dbName) throws CatalogException {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
-        throw new FunctionNotExistException(getName(), functionPath);
-    }
-
-    @Override
-    public boolean functionExists(ObjectPath functionPath) throws CatalogException {
-        return false;
-    }
-
-    @Override
-    public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
-            throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
-            throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
-            throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics,
-            boolean ignoreIfNotExists) throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics,
-            boolean ignoreIfNotExists) throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
-            CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
-            CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
-            throws TableNotExistException, TableNotPartitionedException, CatalogException {
-        Table table = loadIcebergTable(tablePath);
-
-        if (table.spec().isUnpartitioned()) {
-            throw new TableNotPartitionedException(icebergCatalog.name(), tablePath);
-        }
-
-        Set<CatalogPartitionSpec> set = Sets.newHashSet();
-        try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
-            for (DataFile dataFile : CloseableIterable.transform(tasks, FileScanTask::file)) {
-                Map<String, String> map = Maps.newHashMap();
-                StructLike structLike = dataFile.partition();
-                PartitionSpec spec = table.specs().get(dataFile.specId());
-                for (int i = 0; i < structLike.size(); i++) {
-                    map.put(spec.fields().get(i).name(), String.valueOf(structLike.get(i, Object.class)));
-                }
-                set.add(new CatalogPartitionSpec(map));
-            }
-        } catch (IOException e) {
-            throw new CatalogException(String.format("Failed to list partitions of table %s", tablePath), e);
-        }
-
-        return Lists.newArrayList(set);
-    }
-
-    @Override
-    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-            throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)
-            throws CatalogException {
-        throw new UnsupportedOperationException();
-    }
-
-    // After partition pruning and filter push down, the statistics have become very inaccurate, so the statistics from
-    // here are of little significance.
-    // Flink will support something like SupportsReportStatistics in future.
-
-    @Override
-    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
-            throws CatalogException {
-        return CatalogTableStatistics.UNKNOWN;
-    }
-
-    @Override
-    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
-            throws CatalogException {
-        return CatalogColumnStatistics.UNKNOWN;
-    }
-
-    @Override
-    public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
-            throws CatalogException {
-        return CatalogTableStatistics.UNKNOWN;
-    }
-
-    @Override
-    public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath,
-            CatalogPartitionSpec partitionSpec)
-            throws CatalogException {
-        return CatalogColumnStatistics.UNKNOWN;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.util.StringUtils;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}.
+ * <p>
+ * The mapping between Flink database and Iceberg namespace:
+ * Supplying a base namespace for a given catalog, so if you have a catalog that supports a 2-level namespace, you
+ * would supply the first level in the catalog configuration and the second level would be exposed as Flink databases.
+ * <p>
+ * The Iceberg table manages its partitions by itself. The partition of the Iceberg table is independent of the
+ * partition of Flink.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+public class FlinkCatalog extends AbstractCatalog {
+
+    private final CatalogLoader catalogLoader;
+    private final Catalog icebergCatalog;
+    private final Namespace baseNamespace;
+    private final SupportsNamespaces asNamespaceCatalog;
+    private final Closeable closeable;
+    private final boolean cacheEnabled;
+
+    public FlinkCatalog(
+            String catalogName,
+            String defaultDatabase,
+            Namespace baseNamespace,
+            CatalogLoader catalogLoader,
+            boolean cacheEnabled) {
+        super(catalogName, defaultDatabase);
+        this.catalogLoader = catalogLoader;
+        this.baseNamespace = baseNamespace;
+        this.cacheEnabled = cacheEnabled;
+
+        Catalog originalCatalog = catalogLoader.loadCatalog();
+        icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
+        asNamespaceCatalog = originalCatalog instanceof SupportsNamespaces
+                ? (SupportsNamespaces) originalCatalog : null;
+        closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        // Create the default database if it does not exist.
+        try {
+            createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
+        } catch (DatabaseAlreadyExistException e) {
+            // Ignore the exception if it's already exist.
+        }
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (IOException e) {
+                throw new CatalogException(e);
+            }
+        }
+    }
+
+    public Catalog catalog() {
+        return icebergCatalog;
+    }
+
+    private Namespace toNamespace(String database) {
+        String[] namespace = new String[baseNamespace.levels().length + 1];
+        System.arraycopy(baseNamespace.levels(), 0, namespace, 0, baseNamespace.levels().length);
+        namespace[baseNamespace.levels().length] = database;
+        return Namespace.of(namespace);
+    }
+
+    TableIdentifier toIdentifier(ObjectPath path) {
+        return TableIdentifier.of(toNamespace(path.getDatabaseName()), path.getObjectName());
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        if (asNamespaceCatalog == null) {
+            return Collections.singletonList(getDefaultDatabase());
+        }
+
+        return asNamespaceCatalog.listNamespaces(baseNamespace).stream()
+                .map(n -> n.level(n.levels().length - 1))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+        if (asNamespaceCatalog == null) {
+            if (!getDefaultDatabase().equals(databaseName)) {
+                throw new DatabaseNotExistException(getName(), databaseName);
+            } else {
+                return new CatalogDatabaseImpl(Maps.newHashMap(), "");
+            }
+        } else {
+            try {
+                Map<String, String> metadata =
+                        Maps.newHashMap(asNamespaceCatalog.loadNamespaceMetadata(toNamespace(databaseName)));
+                String comment = metadata.remove("comment");
+                return new CatalogDatabaseImpl(metadata, comment);
+            } catch (NoSuchNamespaceException e) {
+                throw new DatabaseNotExistException(getName(), databaseName, e);
+            }
+        }
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException {
+        try {
+            getDatabase(databaseName);
+            return true;
+        } catch (DatabaseNotExistException ignore) {
+            return false;
+        }
+    }
+
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        createDatabase(name, mergeComment(database.getProperties(), database.getComment()), ignoreIfExists);
+    }
+
+    private void createDatabase(String databaseName, Map<String, String> metadata, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        if (asNamespaceCatalog != null) {
+            try {
+                asNamespaceCatalog.createNamespace(toNamespace(databaseName), metadata);
+            } catch (AlreadyExistsException e) {
+                if (!ignoreIfExists) {
+                    throw new DatabaseAlreadyExistException(getName(), databaseName, e);
+                }
+            }
+        } else {
+            throw new UnsupportedOperationException("Namespaces are not supported by catalog: " + getName());
+        }
+    }
+
+    private Map<String, String> mergeComment(Map<String, String> metadata, String comment) {
+        Map<String, String> ret = Maps.newHashMap(metadata);
+        if (metadata.containsKey("comment")) {
+            throw new CatalogException("Database properties should not contain key: 'comment'.");
+        }
+
+        if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+            ret.put("comment", comment);
+        }
+        return ret;
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+        if (asNamespaceCatalog != null) {
+            try {
+                boolean success = asNamespaceCatalog.dropNamespace(toNamespace(name));
+                if (!success && !ignoreIfNotExists) {
+                    throw new DatabaseNotExistException(getName(), name);
+                }
+            } catch (NoSuchNamespaceException e) {
+                if (!ignoreIfNotExists) {
+                    throw new DatabaseNotExistException(getName(), name, e);
+                }
+            } catch (NamespaceNotEmptyException e) {
+                throw new DatabaseNotEmptyException(getName(), name, e);
+            }
+        } else {
+            if (!ignoreIfNotExists) {
+                throw new DatabaseNotExistException(getName(), name);
+            }
+        }
+    }
+
+    @Override
+    public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        if (asNamespaceCatalog != null) {
+            Namespace namespace = toNamespace(name);
+            Map<String, String> updates = Maps.newHashMap();
+            Set<String> removals = Sets.newHashSet();
+
+            try {
+                Map<String, String> oldProperties = asNamespaceCatalog.loadNamespaceMetadata(namespace);
+                Map<String, String> newProperties = mergeComment(newDatabase.getProperties(), newDatabase.getComment());
+
+                for (String key : oldProperties.keySet()) {
+                    if (!newProperties.containsKey(key)) {
+                        removals.add(key);
+                    }
+                }
+
+                for (Map.Entry<String, String> entry : newProperties.entrySet()) {
+                    if (!entry.getValue().equals(oldProperties.get(entry.getKey()))) {
+                        updates.put(entry.getKey(), entry.getValue());
+                    }
+                }
+
+                if (!updates.isEmpty()) {
+                    asNamespaceCatalog.setProperties(namespace, updates);
+                }
+
+                if (!removals.isEmpty()) {
+                    asNamespaceCatalog.removeProperties(namespace, removals);
+                }
+
+            } catch (NoSuchNamespaceException e) {
+                if (!ignoreIfNotExists) {
+                    throw new DatabaseNotExistException(getName(), name, e);
+                }
+            }
+        } else {
+            if (getDefaultDatabase().equals(name)) {
+                throw new CatalogException(
+                        "Can not alter the default database when the iceberg catalog doesn't support namespaces.");
+            }
+            if (!ignoreIfNotExists) {
+                throw new DatabaseNotExistException(getName(), name);
+            }
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+        try {
+            return icebergCatalog.listTables(toNamespace(databaseName)).stream()
+                    .map(TableIdentifier::name)
+                    .collect(Collectors.toList());
+        } catch (NoSuchNamespaceException e) {
+            throw new DatabaseNotExistException(getName(), databaseName, e);
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+        Table table = loadIcebergTable(tablePath);
+        return toCatalogTable(table);
+    }
+
+    private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
+        try {
+            Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
+            if (cacheEnabled) {
+                table.refresh();
+            }
+
+            return table;
+        } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+            throw new TableNotExistException(getName(), tablePath, e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return icebergCatalog.tableExists(toIdentifier(tablePath));
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        try {
+            icebergCatalog.dropTable(toIdentifier(tablePath));
+        } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+            if (!ignoreIfNotExists) {
+                throw new TableNotExistException(getName(), tablePath, e);
+            }
+        }
+    }
+
+    @Override
+    public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException, CatalogException {
+        try {
+            icebergCatalog.renameTable(
+                    toIdentifier(tablePath),
+                    toIdentifier(new ObjectPath(tablePath.getDatabaseName(), newTableName)));
+        } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+            if (!ignoreIfNotExists) {
+                throw new TableNotExistException(getName(), tablePath, e);
+            }
+        } catch (AlreadyExistsException e) {
+            throw new TableAlreadyExistException(getName(), tablePath, e);
+        }
+    }
+
+    @Override
+    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+            throws CatalogException, TableAlreadyExistException {
+        if (Objects.equals(table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
+            throw new IllegalArgumentException("Cannot create the table with 'connector'='iceberg' table property in "
+                    + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg "
+                    + "catalog or create table without 'connector'='iceberg' related properties in an iceberg table.");
+        }
+
+        createIcebergTable(tablePath, table, ignoreIfExists);
+    }
+
+    void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+            throws CatalogException, TableAlreadyExistException {
+        validateFlinkTable(table);
+
+        Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+        PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
+
+        ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+        String location = null;
+        for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+            if ("location".equalsIgnoreCase(entry.getKey())) {
+                location = entry.getValue();
+            } else {
+                properties.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        try {
+            icebergCatalog.createTable(
+                    toIdentifier(tablePath),
+                    icebergSchema,
+                    spec,
+                    location,
+                    properties.build());
+        } catch (AlreadyExistsException e) {
+            if (!ignoreIfExists) {
+                throw new TableAlreadyExistException(getName(), tablePath, e);
+            }
+        }
+    }
+
+    @Override
+    public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+            throws CatalogException, TableNotExistException {
+        validateFlinkTable(newTable);
+
+        Table icebergTable;
+        try {
+            icebergTable = loadIcebergTable(tablePath);
+        } catch (TableNotExistException e) {
+            if (!ignoreIfNotExists) {
+                throw e;
+            } else {
+                return;
+            }
+        }
+
+        CatalogTable table = toCatalogTable(icebergTable);
+
+        // Currently, Flink SQL only support altering table properties.
+
+        // For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by comparing
+        // CatalogTable instances, unless the Flink schema contains Iceberg column IDs.
+        if (!table.getSchema().equals(newTable.getSchema())) {
+            throw new UnsupportedOperationException("Altering schema is not supported yet.");
+        }
+
+        if (!table.getPartitionKeys().equals(((CatalogTable) newTable).getPartitionKeys())) {
+            throw new UnsupportedOperationException("Altering partition keys is not supported yet.");
+        }
+
+        Map<String, String> oldProperties = table.getOptions();
+        Map<String, String> setProperties = Maps.newHashMap();
+
+        String setLocation = null;
+        String setSnapshotId = null;
+        String pickSnapshotId = null;
+
+        for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+
+            if (Objects.equals(value, oldProperties.get(key))) {
+                continue;
+            }
+
+            if ("location".equalsIgnoreCase(key)) {
+                setLocation = value;
+            } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+                setSnapshotId = value;
+            } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+                pickSnapshotId = value;
+            } else {
+                setProperties.put(key, value);
+            }
+        }
+
+        oldProperties.keySet().forEach(k -> {
+            if (!newTable.getOptions().containsKey(k)) {
+                setProperties.put(k, null);
+            }
+        });
+
+        commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
+    }
+
+    private static void validateFlinkTable(CatalogBaseTable table) {
+        Preconditions.checkArgument(table instanceof CatalogTable, "The Table should be a CatalogTable.");
+
+        TableSchema schema = table.getSchema();
+        schema.getTableColumns().forEach(column -> {
+            if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
+                throw new UnsupportedOperationException("Creating table with computed columns is not supported yet.");
+            }
+        });
+
+        if (!schema.getWatermarkSpecs().isEmpty()) {
+            throw new UnsupportedOperationException("Creating table with watermark specs is not supported yet.");
+        }
+    }
+
+    private static PartitionSpec toPartitionSpec(List<String> partitionKeys, Schema icebergSchema) {
+        PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
+        partitionKeys.forEach(builder::identity);
+        return builder.build();
+    }
+
+    private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSchema) {
+        List<String> partitionKeys = Lists.newArrayList();
+        for (PartitionField field : spec.fields()) {
+            if (field.transform().isIdentity()) {
+                partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
+            } else {
+                // Not created by Flink SQL.
+                // For compatibility with iceberg tables, return empty.
+                // TODO modify this after Flink support partition transform.
+                return Collections.emptyList();
+            }
+        }
+        return partitionKeys;
+    }
+
+    private static void commitChanges(Table table, String setLocation, String setSnapshotId,
+            String pickSnapshotId, Map<String, String> setProperties) {
+        // don't allow setting the snapshot and picking a commit at the same time because order is ambiguous and
+        // choosing one order leads to different results
+        Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId == null,
+                "Cannot set the current snapshot ID and cherry-pick snapshot changes");
+
+        if (setSnapshotId != null) {
+            long newSnapshotId = Long.parseLong(setSnapshotId);
+            table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
+        }
+
+        // if updating the table snapshot, perform that update first in case it fails
+        if (pickSnapshotId != null) {
+            long newSnapshotId = Long.parseLong(pickSnapshotId);
+            table.manageSnapshots().cherrypick(newSnapshotId).commit();
+        }
+
+        Transaction transaction = table.newTransaction();
+
+        if (setLocation != null) {
+            transaction.updateLocation()
+                    .setLocation(setLocation)
+                    .commit();
+        }
+
+        if (!setProperties.isEmpty()) {
+            UpdateProperties updateProperties = transaction.updateProperties();
+            setProperties.forEach((k, v) -> {
+                if (v == null) {
+                    updateProperties.remove(k);
+                } else {
+                    updateProperties.set(k, v);
+                }
+            });
+            updateProperties.commit();
+        }
+
+        transaction.commitTransaction();
+    }
+
+    static CatalogTable toCatalogTable(Table table) {
+        TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
+        List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
+
+        // NOTE: We can not create a IcebergCatalogTable extends CatalogTable, because Flink optimizer may use
+        // CatalogTableImpl to copy a new catalog table.
+        // Let's re-loading table from Iceberg catalog when creating source/sink operators.
+        // Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
+        return new CatalogTableImpl(schema, partitionKeys, table.properties(), null);
+    }
+
+    @Override
+    public Optional<Factory> getFactory() {
+        return Optional.of(new FlinkDynamicTableFactory(this));
+    }
+
+    CatalogLoader getCatalogLoader() {
+        return catalogLoader;
+    }
+
+    // ------------------------------ Unsupported methods ---------------------------------------------
+
+    @Override
+    public List<String> listViews(String databaseName) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition,
+            boolean ignoreIfExists) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition,
+            boolean ignoreIfNotExists) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<String> listFunctions(String dbName) throws CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
+        throw new FunctionNotExistException(getName(), functionPath);
+    }
+
+    @Override
+    public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+        return false;
+    }
+
+    @Override
+    public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics,
+            boolean ignoreIfNotExists) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
+            CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
+            CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+            throws TableNotExistException, TableNotPartitionedException, CatalogException {
+        Table table = loadIcebergTable(tablePath);
+
+        if (table.spec().isUnpartitioned()) {
+            throw new TableNotPartitionedException(icebergCatalog.name(), tablePath);
+        }
+
+        Set<CatalogPartitionSpec> set = Sets.newHashSet();
+        try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+            for (DataFile dataFile : CloseableIterable.transform(tasks, FileScanTask::file)) {
+                Map<String, String> map = Maps.newHashMap();
+                StructLike structLike = dataFile.partition();
+                PartitionSpec spec = table.specs().get(dataFile.specId());
+                for (int i = 0; i < structLike.size(); i++) {
+                    map.put(spec.fields().get(i).name(), String.valueOf(structLike.get(i, Object.class)));
+                }
+                set.add(new CatalogPartitionSpec(map));
+            }
+        } catch (IOException e) {
+            throw new CatalogException(String.format("Failed to list partitions of table %s", tablePath), e);
+        }
+
+        return Lists.newArrayList(set);
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)
+            throws CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    // After partition pruning and filter push down, the statistics have become very inaccurate, so the statistics from
+    // here are of little significance.
+    // Flink will support something like SupportsReportStatistics in future.
+
+    @Override
+    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+            throws CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
+            throws CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getPartitionColumnStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+}
+
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalogFactory.java
similarity index 91%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalogFactory.java
index f4bb7d49f..3ef8405c6 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalogFactory.java
@@ -1,166 +1,167 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.sort.iceberg;
-
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.util.HadoopUtils;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.flink.CatalogLoader;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Strings;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-/**
- * A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
- * <p>
- * This supports the following catalog configuration options:
- * <ul>
- *   <li><code>type</code> - Flink catalog factory key, should be "iceberg"</li>
- *   <li><code>catalog-type</code> - iceberg catalog type, "hive" or "hadoop"</li>
- *   <li><code>uri</code> - the Hive Metastore URI (Hive catalog only)</li>
- *   <li><code>clients</code> - the Hive Client Pool Size (Hive catalog only)</li>
- *   <li><code>warehouse</code> - the warehouse path (Hadoop catalog only)</li>
- *   <li><code>default-database</code> - a database name to use as the default</li>
- *   <li><code>base-namespace</code> - a base namespace as the prefix for all databases (Hadoop catalog only)</li>
- *   <li><code>cache-enabled</code> - whether to enable catalog cache</li>
- * </ul>
- * </p>
- * To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override
- * {@link #createCatalogLoader(String, Map, Configuration)}.
- */
-public class FlinkCatalogFactory implements CatalogFactory {
-
-    // Can not just use "type", it conflicts with CATALOG_TYPE.
-    public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
-    public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
-    public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
-
-    public static final String HIVE_CONF_DIR = "hive-conf-dir";
-    public static final String DEFAULT_DATABASE = "default-database";
-    public static final String DEFAULT_DATABASE_NAME = "default";
-    public static final String BASE_NAMESPACE = "base-namespace";
-    public static final String CACHE_ENABLED = "cache-enabled";
-
-    public static final String TYPE = "type";
-    public static final String PROPERTY_VERSION = "property-version";
-
-    /**
-     * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter.
-     *
-     * @param name Flink's catalog name
-     * @param properties Flink's catalog properties
-     * @param hadoopConf Hadoop configuration for catalog
-     * @return an Iceberg catalog loader
-     */
-    static CatalogLoader createCatalogLoader(String name, Map<String, String> properties, Configuration hadoopConf) {
-        String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL);
-        if (catalogImpl != null) {
-            String catalogType = properties.get(ICEBERG_CATALOG_TYPE);
-            Preconditions.checkArgument(catalogType == null,
-                    "Cannot create catalog %s, both catalog-type and "
-                            + "catalog-impl are set: catalog-type=%s, catalog-impl=%s",
-                    name, catalogType, catalogImpl);
-            return CatalogLoader.custom(name, properties, hadoopConf, catalogImpl);
-        }
-
-        String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
-        switch (catalogType.toLowerCase(Locale.ENGLISH)) {
-            case ICEBERG_CATALOG_TYPE_HIVE:
-                // The values of properties 'uri', 'warehouse',
-                // 'hive-conf-dir' are allowed to be null, in that case it will
-                // fallback to parse those values from hadoop configuration which is loaded from classpath.
-                String hiveConfDir = properties.get(HIVE_CONF_DIR);
-                Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir);
-                return CatalogLoader.hive(name, newHadoopConf, properties);
-
-            case ICEBERG_CATALOG_TYPE_HADOOP:
-                return CatalogLoader.hadoop(name, hadoopConf, properties);
-
-            default:
-                throw new UnsupportedOperationException("Unknown catalog-type: " + catalogType
-                        + " (Must be 'hive' or 'hadoop')");
-        }
-    }
-
-    private static Configuration mergeHiveConf(Configuration hadoopConf, String hiveConfDir) {
-        Configuration newConf = new Configuration(hadoopConf);
-        if (!Strings.isNullOrEmpty(hiveConfDir)) {
-            Preconditions.checkState(Files.exists(Paths.get(hiveConfDir, "hive-site.xml")),
-                    "There should be a hive-site.xml file under the directory %s", hiveConfDir);
-            newConf.addResource(new Path(hiveConfDir, "hive-site.xml"));
-        } else {
-            // If don't provide the hive-site.xml path explicitly, it will try to load resource from classpath. If still
-            // couldn't load the configuration file, then it will throw exception in HiveCatalog.
-            URL configFile = CatalogLoader.class.getClassLoader().getResource("hive-site.xml");
-            if (configFile != null) {
-                newConf.addResource(configFile);
-            }
-        }
-        return newConf;
-    }
-
-    public static Configuration clusterHadoopConf() {
-        return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
-    }
-
-    @Override
-    public Map<String, String> requiredContext() {
-        Map<String, String> context = Maps.newHashMap();
-        context.put(TYPE, "iceberg");
-        context.put(PROPERTY_VERSION, "1");
-        return context;
-    }
-
-    @Override
-    public List<String> supportedProperties() {
-        return ImmutableList.of("*");
-    }
-
-    @Override
-    public Catalog createCatalog(String name, Map<String, String> properties) {
-        return createCatalog(name, properties, clusterHadoopConf());
-    }
-
-    protected Catalog createCatalog(String name, Map<String, String> properties, Configuration hadoopConf) {
-        CatalogLoader catalogLoader = createCatalogLoader(name, properties, hadoopConf);
-        String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, DEFAULT_DATABASE_NAME);
-
-        Namespace baseNamespace = Namespace.empty();
-        if (properties.containsKey(BASE_NAMESPACE)) {
-            baseNamespace = Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
-        }
-
-        boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
-        return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink;
+
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
+ * <p>
+ * This supports the following catalog configuration options:
+ * <ul>
+ *   <li><code>type</code> - Flink catalog factory key, should be "iceberg"</li>
+ *   <li><code>catalog-type</code> - iceberg catalog type, "hive" or "hadoop"</li>
+ *   <li><code>uri</code> - the Hive Metastore URI (Hive catalog only)</li>
+ *   <li><code>clients</code> - the Hive Client Pool Size (Hive catalog only)</li>
+ *   <li><code>warehouse</code> - the warehouse path (Hadoop catalog only)</li>
+ *   <li><code>default-database</code> - a database name to use as the default</li>
+ *   <li><code>base-namespace</code> - a base namespace as the prefix for all databases (Hadoop catalog only)</li>
+ *   <li><code>cache-enabled</code> - whether to enable catalog cache</li>
+ * </ul>
+ * <p>
+ * To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override
+ * {@link #createCatalogLoader(String, Map, Configuration)}.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+public class FlinkCatalogFactory implements CatalogFactory {
+
+    // Can not just use "type", it conflicts with CATALOG_TYPE.
+    public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
+    public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+    public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+    public static final String HIVE_CONF_DIR = "hive-conf-dir";
+    public static final String DEFAULT_DATABASE = "default-database";
+    public static final String DEFAULT_DATABASE_NAME = "default";
+    public static final String BASE_NAMESPACE = "base-namespace";
+    public static final String CACHE_ENABLED = "cache-enabled";
+
+    public static final String TYPE = "type";
+    public static final String PROPERTY_VERSION = "property-version";
+
+    /**
+     * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter.
+     *
+     * @param name       Flink's catalog name
+     * @param properties Flink's catalog properties
+     * @param hadoopConf Hadoop configuration for catalog
+     * @return an Iceberg catalog loader
+     */
+    static CatalogLoader createCatalogLoader(String name, Map<String, String> properties, Configuration hadoopConf) {
+        String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL);
+        if (catalogImpl != null) {
+            String catalogType = properties.get(ICEBERG_CATALOG_TYPE);
+            Preconditions.checkArgument(catalogType == null,
+                    "Cannot create catalog %s, both catalog-type and catalog-impl are set: "
+                            + "catalog-type=%s, catalog-impl=%s",
+                    name, catalogType, catalogImpl);
+            return CatalogLoader.custom(name, properties, hadoopConf, catalogImpl);
+        }
+
+        String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
+        switch (catalogType.toLowerCase(Locale.ENGLISH)) {
+            case ICEBERG_CATALOG_TYPE_HIVE:
+                // The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in that case
+                // it will fallback to parse those values from hadoop configuration which is loaded from classpath.
+                String hiveConfDir = properties.get(HIVE_CONF_DIR);
+                Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir);
+                return CatalogLoader.hive(name, newHadoopConf, properties);
+
+            case ICEBERG_CATALOG_TYPE_HADOOP:
+                return CatalogLoader.hadoop(name, hadoopConf, properties);
+
+            default:
+                throw new UnsupportedOperationException("Unknown catalog-type: " + catalogType
+                        + " (Must be 'hive' or 'hadoop')");
+        }
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> context = Maps.newHashMap();
+        context.put(TYPE, "iceberg");
+        context.put(PROPERTY_VERSION, "1");
+        return context;
+    }
+
+    @Override
+    public List<String> supportedProperties() {
+        return ImmutableList.of("*");
+    }
+
+    @Override
+    public Catalog createCatalog(String name, Map<String, String> properties) {
+        return createCatalog(name, properties, clusterHadoopConf());
+    }
+
+    protected Catalog createCatalog(String name, Map<String, String> properties, Configuration hadoopConf) {
+        CatalogLoader catalogLoader = createCatalogLoader(name, properties, hadoopConf);
+        String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, DEFAULT_DATABASE_NAME);
+
+        Namespace baseNamespace = Namespace.empty();
+        if (properties.containsKey(BASE_NAMESPACE)) {
+            baseNamespace = Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
+        }
+
+        boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
+        return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
+    }
+
+    private static Configuration mergeHiveConf(Configuration hadoopConf, String hiveConfDir) {
+        Configuration newConf = new Configuration(hadoopConf);
+        if (!Strings.isNullOrEmpty(hiveConfDir)) {
+            Preconditions.checkState(Files.exists(Paths.get(hiveConfDir, "hive-site.xml")),
+                    "There should be a hive-site.xml file under the directory %s", hiveConfDir);
+            newConf.addResource(new Path(hiveConfDir, "hive-site.xml"));
+        } else {
+            // If don't provide the hive-site.xml path explicitly, it will try to load resource from classpath. If still
+            // couldn't load the configuration file, then it will throw exception in HiveCatalog.
+            URL configFile = CatalogLoader.class.getClassLoader().getResource("hive-site.xml");
+            if (configFile != null) {
+                newConf.addResource(configFile);
+            }
+        }
+        return newConf;
+    }
+
+    public static Configuration clusterHadoopConf() {
+        return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
similarity index 78%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
index e10626148..35d3a6c76 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
@@ -1,222 +1,233 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.sort.iceberg;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.factories.DynamicTableSinkFactory;
-import org.apache.flink.table.factories.DynamicTableSourceFactory;
-import org.apache.flink.table.utils.TableSchemaUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.common.DynMethods;
-import org.apache.iceberg.exceptions.AlreadyExistsException;
-import org.apache.iceberg.flink.IcebergTableSource;
-import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
-
-/**
- * Copy from org.apache.iceberg.flink:iceberg-flink-runtime-1.13:0.13.1
- *
- * <p>
- * Factory for creating configured instances of {@link IcebergTableSource} and {@link
- * IcebergTableSink}.We modify KafkaDynamicTableSink to support append-mode .
- * </p>
- */
-public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {
-
-    static final String FACTORY_IDENTIFIER = "iceberg-inlong";
-
-    private static final ConfigOption<String> CATALOG_NAME =
-            ConfigOptions.key("catalog-name")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Catalog name");
-
-    private static final ConfigOption<String> CATALOG_TYPE =
-            ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Catalog type, the optional types are: custom, hadoop, hive.");
-
-    private static final ConfigOption<String> CATALOG_DATABASE =
-            ConfigOptions.key("catalog-database")
-                    .stringType()
-                    .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
-                    .withDescription("Database name managed in the iceberg catalog.");
-
-    private static final ConfigOption<String> CATALOG_TABLE =
-            ConfigOptions.key("catalog-table")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Table name managed in the underlying iceberg catalog and database.");
-
-    // Flink 1.13.x change the return type from CatalogTable interface to ResolvedCatalogTable which extends the
-    // CatalogTable. Here we use the dynamic method loading approach to avoid adding explicit CatalogTable or
-    // ResolvedCatalogTable class into the iceberg-flink-runtime jar for compatibility purpose.
-    private static final DynMethods.UnboundMethod GET_CATALOG_TABLE = DynMethods.builder("getCatalogTable")
-            .impl(Context.class, "getCatalogTable")
-            .orNoop()
-            .build();
-
-    private final FlinkCatalog catalog;
-
-    public FlinkDynamicTableFactory() {
-        this.catalog = null;
-    }
-
-    public FlinkDynamicTableFactory(FlinkCatalog catalog) {
-        this.catalog = catalog;
-    }
-
-    private static CatalogTable loadCatalogTable(Context context) {
-        return GET_CATALOG_TABLE.invoke(context);
-    }
-
-    private static TableLoader createTableLoader(CatalogBaseTable catalogBaseTable,
-            Map<String, String> tableProps,
-            String databaseName,
-            String tableName) {
-        Configuration flinkConf = new Configuration();
-        tableProps.forEach(flinkConf::setString);
-
-        String catalogName = flinkConf.getString(CATALOG_NAME);
-        Preconditions.checkNotNull(catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key());
-
-        String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName);
-        Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null");
-
-        String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
-        Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null");
-
-        org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
-        FlinkCatalogFactory factory = new FlinkCatalogFactory();
-        FlinkCatalog flinkCatalog = (FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf);
-        ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable);
-
-        // Create database if not exists in the external catalog.
-        if (!flinkCatalog.databaseExists(catalogDatabase)) {
-            try {
-                flinkCatalog.createDatabase(catalogDatabase, new CatalogDatabaseImpl(Maps.newHashMap(), null), true);
-            } catch (DatabaseAlreadyExistException e) {
-                throw new AlreadyExistsException(e, "Database %s already exists in the iceberg catalog %s.",
-                        catalogName,
-                        catalogDatabase);
-            }
-        }
-
-        // Create table if not exists in the external catalog.
-        if (!flinkCatalog.tableExists(objectPath)) {
-            try {
-                flinkCatalog.createIcebergTable(objectPath, catalogBaseTable, true);
-            } catch (TableAlreadyExistException e) {
-                throw new AlreadyExistsException(e, "Table %s already exists in the database %s and catalog %s",
-                        catalogTable, catalogDatabase, catalogName);
-            }
-        }
-
-        return TableLoader.fromCatalog(flinkCatalog.getCatalogLoader(),
-                TableIdentifier.of(catalogDatabase, catalogTable));
-    }
-
-    private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) {
-        Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
-        return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath));
-    }
-
-    @Override
-    public DynamicTableSource createDynamicTableSource(Context context) {
-        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
-        CatalogTable catalogTable = loadCatalogTable(context);
-        Map<String, String> tableProps = catalogTable.getOptions();
-        TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
-
-        TableLoader tableLoader;
-        if (catalog != null) {
-            tableLoader = createTableLoader(catalog, objectIdentifier.toObjectPath());
-        } else {
-            tableLoader = createTableLoader(catalogTable, tableProps, objectIdentifier.getDatabaseName(),
-                    objectIdentifier.getObjectName());
-        }
-
-        return new IcebergTableSource(tableLoader, tableSchema, tableProps, context.getConfiguration());
-    }
-
-    @Override
-    public DynamicTableSink createDynamicTableSink(Context context) {
-        ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
-        CatalogTable catalogTable = loadCatalogTable(context);
-        Map<String, String> tableProps = catalogTable.getOptions();
-        TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
-
-        TableLoader tableLoader;
-        if (catalog != null) {
-            tableLoader = createTableLoader(catalog, objectPath);
-        } else {
-            tableLoader = createTableLoader(catalogTable, tableProps, objectPath.getDatabaseName(),
-                    objectPath.getObjectName());
-        }
-
-        return new IcebergTableSink(tableLoader, tableSchema, catalogTable);
-    }
-
-    @Override
-    public Set<ConfigOption<?>> requiredOptions() {
-        Set<ConfigOption<?>> options = Sets.newHashSet();
-        options.add(CATALOG_TYPE);
-        options.add(CATALOG_NAME);
-        return options;
-    }
-
-    @Override
-    public Set<ConfigOption<?>> optionalOptions() {
-        Set<ConfigOption<?>> options = Sets.newHashSet();
-        options.add(CATALOG_DATABASE);
-        options.add(CATALOG_TABLE);
-        options.add(ICEBERG_IGNORE_ALL_CHANGELOG);
-        options.add(INLONG_METRIC);
-        options.add(INLONG_AUDIT);
-        return options;
-    }
-
-    @Override
-    public String factoryIdentifier() {
-        return FACTORY_IDENTIFIER;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.IcebergTableSource;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.iceberg.flink.actions.SyncRewriteDataFilesActionOption;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {
+    static final String FACTORY_IDENTIFIER = "dlc-inlong";
+
+    public static final ConfigOption<String> CATALOG_NAME =
+            ConfigOptions.key("catalog-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Catalog name");
+
+    public static final ConfigOption<String> CATALOG_TYPE =
+            ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Catalog type, the optional types are: custom, hadoop, hive.");
+
+    public static final ConfigOption<String> CATALOG_DATABASE =
+            ConfigOptions.key("catalog-database")
+                    .stringType()
+                    .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+                    .withDescription("Database name managed in the iceberg catalog.");
+
+    public static final ConfigOption<String> CATALOG_TABLE =
+            ConfigOptions.key("catalog-table")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Table name managed in the underlying iceberg catalog and database.");
+
+    public static final ConfigOption<Boolean> ICEBERG_IGNORE_ALL_CHANGELOG =
+            ConfigOptions.key("sink.ignore.changelog")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Regard upsert delete as insert kind.");
+
+    // Flink 1.13.x change the return type from CatalogTable interface to ResolvedCatalogTable which extends the
+    // CatalogTable. Here we use the dynamic method loading approach to avoid adding explicit CatalogTable or
+    // ResolvedCatalogTable class into the iceberg-flink-runtime jar for compatibility purpose.
+    private static final DynMethods.UnboundMethod GET_CATALOG_TABLE = DynMethods.builder("getCatalogTable")
+            .impl(Context.class, "getCatalogTable")
+            .orNoop()
+            .build();
+
+    private final FlinkCatalog catalog;
+
+    public FlinkDynamicTableFactory() {
+        this.catalog = null;
+    }
+
+    public FlinkDynamicTableFactory(FlinkCatalog catalog) {
+        this.catalog = catalog;
+    }
+
+    private static CatalogTable loadCatalogTable(Context context) {
+        return GET_CATALOG_TABLE.invoke(context);
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
+        CatalogTable catalogTable = loadCatalogTable(context);
+        Map<String, String> tableProps = catalogTable.getOptions();
+        TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
+
+        TableLoader tableLoader;
+        if (catalog != null) {
+            tableLoader = createTableLoader(catalog, objectIdentifier.toObjectPath());
+        } else {
+            tableLoader = createTableLoader(catalogTable, tableProps, objectIdentifier.getDatabaseName(),
+                    objectIdentifier.getObjectName());
+        }
+
+        return new IcebergTableSource(tableLoader, tableSchema, tableProps, context.getConfiguration());
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
+        CatalogTable catalogTable = loadCatalogTable(context);
+        Map<String, String> tableProps = catalogTable.getOptions();
+        TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
+        SyncRewriteDataFilesActionOption compactOption = new SyncRewriteDataFilesActionOption(tableProps);
+        MetricOption metricOption = null;
+        if (tableProps.containsKey(INLONG_METRIC.key())) {
+            metricOption = new MetricOption(
+                    tableProps.getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()),
+                    tableProps.getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()));
+        }
+        boolean appendMode = tableProps.containsKey(ICEBERG_IGNORE_ALL_CHANGELOG.key())
+                ? Boolean.parseBoolean(tableProps.get(ICEBERG_IGNORE_ALL_CHANGELOG.key()))
+                : ICEBERG_IGNORE_ALL_CHANGELOG.defaultValue();
+
+        TableLoader tableLoader;
+        if (catalog != null) {
+            tableLoader = createTableLoader(catalog, objectPath);
+        } else {
+            tableLoader = createTableLoader(catalogTable, tableProps, objectPath.getDatabaseName(),
+                    objectPath.getObjectName());
+        }
+
+        return new IcebergTableSink(tableLoader, tableSchema, compactOption, metricOption, appendMode);
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = Sets.newHashSet();
+        options.add(CATALOG_TYPE);
+        options.add(CATALOG_NAME);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = Sets.newHashSet();
+        options.add(CATALOG_DATABASE);
+        options.add(CATALOG_TABLE);
+        options.add(ICEBERG_IGNORE_ALL_CHANGELOG);
+        options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
+        return options;
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return FACTORY_IDENTIFIER;
+    }
+
+    private static TableLoader createTableLoader(CatalogBaseTable catalogBaseTable,
+            Map<String, String> tableProps,
+            String databaseName,
+            String tableName) {
+        Configuration flinkConf = new Configuration();
+        tableProps.forEach(flinkConf::setString);
+
+        String catalogName = flinkConf.getString(CATALOG_NAME);
+        Preconditions.checkNotNull(catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key());
+
+        String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName);
+        Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null");
+
+        String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
+        Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null");
+
+        org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
+        FlinkCatalogFactory factory = new FlinkCatalogFactory();
+        FlinkCatalog flinkCatalog = (FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf);
+        ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable);
+
+        // Create database if not exists in the external catalog.
+        if (!flinkCatalog.databaseExists(catalogDatabase)) {
+            try {
+                flinkCatalog.createDatabase(catalogDatabase,
+                        new CatalogDatabaseImpl(Maps.newHashMap(), null), true);
+            } catch (DatabaseAlreadyExistException e) {
+                throw new AlreadyExistsException(
+                        e, "Database %s already exists in the iceberg catalog %s.", catalogName, catalogDatabase);
+            }
+        }
+
+        // Create table if not exists in the external catalog.
+        if (!flinkCatalog.tableExists(objectPath)) {
+            try {
+                flinkCatalog.createIcebergTable(objectPath, catalogBaseTable, true);
+            } catch (TableAlreadyExistException e) {
+                throw new AlreadyExistsException(e, "Table %s already exists in the database %s and catalog %s",
+                        catalogTable, catalogDatabase, catalogName);
+            }
+        }
+
+        return TableLoader.fromCatalog(
+                flinkCatalog.getCatalogLoader(), TableIdentifier.of(catalogDatabase, catalogTable));
+    }
+
+    private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) {
+        Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
+        return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath));
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
similarity index 74%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
index b5f3bef9e..112b6a36b 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
@@ -17,11 +17,10 @@
  * under the License.
  */
 
-package org.apache.inlong.sort.iceberg;
+package org.apache.inlong.sort.iceberg.flink;
 
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
-import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -31,39 +30,51 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.inlong.sort.iceberg.sink.FlinkSink;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.iceberg.flink.actions.SyncRewriteDataFilesActionOption;
+import org.apache.inlong.sort.iceberg.flink.sink.FlinkSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
-
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ * Add a table property `write.compact.enable` to support small file compact.
+ * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit
+ */
 public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
-
     private static final Logger LOG = LoggerFactory.getLogger(IcebergTableSink.class);
-
     private final TableLoader tableLoader;
     private final TableSchema tableSchema;
-
-    private final CatalogTable catalogTable;
+    private final SyncRewriteDataFilesActionOption compactAction;
+    private final MetricOption metricOption;
+    private final boolean appendMode;
 
     private boolean overwrite = false;
 
     private IcebergTableSink(IcebergTableSink toCopy) {
         this.tableLoader = toCopy.tableLoader;
         this.tableSchema = toCopy.tableSchema;
+        this.compactAction = toCopy.compactAction;
+        this.metricOption = toCopy.metricOption;
         this.overwrite = toCopy.overwrite;
-        this.catalogTable = toCopy.catalogTable;
+        this.appendMode = toCopy.appendMode;
     }
 
-    public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema, CatalogTable catalogTable) {
+    public IcebergTableSink(
+            TableLoader tableLoader,
+            TableSchema tableSchema,
+            SyncRewriteDataFilesActionOption compactAction,
+            MetricOption metricOption,
+            boolean appendMode) {
         this.tableLoader = tableLoader;
         this.tableSchema = tableSchema;
-        this.catalogTable = catalogTable;
+        this.compactAction = compactAction;
+        this.metricOption = metricOption;
+        this.appendMode = appendMode;
     }
 
     @Override
@@ -80,8 +91,9 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
                 .tableSchema(tableSchema)
                 .equalityFieldColumns(equalityColumns)
                 .overwrite(overwrite)
-                .metric(catalogTable.getOptions().getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()),
-                        catalogTable.getOptions().getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()))
+                .metric(metricOption)
+                .appendMode(appendMode)
+                .compact(compactAction)
                 .append();
     }
 
@@ -92,8 +104,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
 
     @Override
     public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-        if (org.apache.flink.configuration.Configuration.fromMap(catalogTable.getOptions())
-                .get(ICEBERG_IGNORE_ALL_CHANGELOG)) {
+        if (appendMode) {
             LOG.warn("Iceberg sink receive all changelog record. "
                     + "Regard any other record as insert-only record.");
             return ChangelogMode.all();
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/RewriteResult.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/RewriteResult.java
new file mode 100644
index 000000000..3e9e421ae
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/RewriteResult.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.actions;
+
+public class RewriteResult {
+    private String result;
+
+    public RewriteResult(String result) {
+        this.result = result;
+    }
+
+    public String getResult() {
+        return result;
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/SyncRewriteDataFilesAction.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/SyncRewriteDataFilesAction.java
new file mode 100644
index 000000000..e9b83e9f4
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/SyncRewriteDataFilesAction.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.actions;
+
+import org.apache.iceberg.actions.Action;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Do rewrite action with dlc Spark SQL.
+ */
+public class SyncRewriteDataFilesAction implements
+        Action<SyncRewriteDataFilesAction, RewriteResult> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SyncRewriteDataFilesAction.class);
+    private static final String DLC_JDBC_CLASS = "com.tencent.cloud.dlc.jdbc.DlcDriver";
+
+    private SyncRewriteDataFilesActionOption options;
+    private AtomicInteger snapshotCounter;
+
+    public SyncRewriteDataFilesAction(SyncRewriteDataFilesActionOption option) {
+        this.snapshotCounter = new AtomicInteger();
+        this.options = option;
+    }
+
+    @Override
+    public SyncRewriteDataFilesAction option(String name, String value) {
+        this.options.option(name, value);
+        return this;
+    }
+
+    @Override
+    public SyncRewriteDataFilesAction options(Map<String, String> options) {
+        this.options.options(options);
+        return this;
+    }
+
+    @Override
+    public RewriteResult execute() {
+        if (!shouldExecute()) {
+            return new RewriteResult("Skip This compact.");
+        }
+
+        Connection connection = buildConnection();
+        if (connection == null) {
+            LOG.error("Can't get DLC JDBC Connection");
+            return new RewriteResult("fail.");
+        }
+
+        String rewriteTableSql = options.rewriteSql();
+        try {
+            Statement statement = connection.createStatement();
+            LOG.info("Do compact: {}", rewriteTableSql);
+            boolean firstIsResultSet = statement.execute(rewriteTableSql);
+            if (firstIsResultSet) {
+                ResultSet rs = statement.getResultSet();
+                ResultSetMetaData rsmd = rs.getMetaData();
+                int columnsNumber = rsmd.getColumnCount();
+                while (rs.next()) {
+                    StringBuilder lineResult = new StringBuilder();
+                    for (int i = 1; i <= columnsNumber; i++) {
+                        if (i > 1) {
+                            lineResult.append(",  ");
+                        }
+                        lineResult.append(rsmd.getColumnName(i) + ":" + rs.getString(i));
+                    }
+                    LOG.info("[Result:]{}", lineResult);
+                }
+            } else {
+                LOG.info("[Result:]there has no output.");
+            }
+            statement.close();
+            connection.close();
+        } catch (SQLException e) {
+            LOG.warn("[Result:]Execute rewrite sql({}) err.", rewriteTableSql, e);
+            return new RewriteResult("fail.");
+        }
+        return new RewriteResult("success.");
+    }
+
+    private boolean shouldExecute() {
+        return snapshotCounter.incrementAndGet() % options.interval() == 0;
+    }
+
+    private Connection buildConnection() {
+        Connection connection = null;
+        String url = options.url();
+        try {
+            Class.forName(DLC_JDBC_CLASS);
+            connection = DriverManager.getConnection(
+                    url,
+                    options.secretId(),
+                    options.secretKey());
+            // get meta data
+            DatabaseMetaData metaData = connection.getMetaData();
+            LOG.info("DLC product = {}, DLC jdbc version = {}, DLC jdbc = '{}'",
+                    metaData.getDatabaseProductName(), metaData.getDriverMajorVersion(), url);
+        } catch (SQLException e) {
+            LOG.error("Create connection err.Please check configuration. Request URL: {}.", url, e);
+        } catch (ClassNotFoundException e) {
+            LOG.error("DLC JDBC Driver class not found.Please check classpath({}).",
+                    System.getProperty("java.class.path"), e);
+        }
+        return connection;
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/SyncRewriteDataFilesActionOption.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/SyncRewriteDataFilesActionOption.java
new file mode 100644
index 000000000..96d59623c
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/SyncRewriteDataFilesActionOption.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.actions;
+
+import com.qcloud.dlc.common.Constants;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.flink.FlinkCatalogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.ACTION_AUTO_COMPACT_OPTIONS;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_INTERVAL;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_INTERVAL_DEFAULT;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_PREFIX;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_RESOUCE_POOL;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_RESOUCE_POOL_DEFAULT;
+import static org.apache.inlong.sort.iceberg.flink.FlinkDynamicTableFactory.CATALOG_DATABASE;
+import static org.apache.inlong.sort.iceberg.flink.FlinkDynamicTableFactory.CATALOG_TABLE;
+
+public class SyncRewriteDataFilesActionOption implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(SyncRewriteDataFilesAction.class);
+
+    private Map<String, String> properties;
+
+    public static final String URL = "url";
+
+    public static final String URL_REGION = "region";
+    public static final String URL_REGION_DEFAULT = "ap-beijing";
+
+    public static final String URL_DEFAULT_DATABASE = "database_name";
+    public static final String URL_DEFAULT_DATABASE_DEFAULT = "default";
+
+    public static final String URL_ENDPOINT = "endpoint";
+    public static final String URL_ENDPOINT_DEFAULT = "dlc.tencentcloudapi.com";
+
+    public static final String URL_TASK_TYPE = "task_type";
+    public static final String URL_TASK_TYPE_DEFAULT = "SparkSQLTask";
+
+    public static final String URL_DATA_SOURCE = "datasource_connection_name";
+    public static final String URL_DATA_SOURCE_DEFAULT = "DataLakeCatalog";
+
+    public static final String URL_DATA_RESOURCE_NAME = "data_engine_name";
+
+    public static final String AUTH_SECRET_ID = "secret_id";
+    public static final String AUTH_SECRET_KEY = "secret_key";
+
+    public static final String REWRITE_DB_NAME = "db_name";
+    public static final String REWRITE_TABLE_NAME = "table_name";
+
+    public SyncRewriteDataFilesActionOption(Map<String, String> tableProperties) {
+        Preconditions.checkNotNull(CATALOG_DATABASE.key());
+        Preconditions.checkNotNull(CATALOG_TABLE.key());
+        Preconditions.checkNotNull(Constants.DLC_SECRET_ID_CONF);
+        Preconditions.checkNotNull(Constants.DLC_SECRET_KEY_CONF);
+        this.properties = new HashMap<>();
+        Optional.ofNullable(tableProperties.get("qcloud.dlc.jdbc.url")).ifPresent(v -> properties.put(URL, v));
+        properties.put(URL_REGION, tableProperties.get(Constants.DLC_REGION_CONF));
+        properties.put(AUTH_SECRET_ID, tableProperties.get(Constants.DLC_SECRET_ID_CONF));
+        properties.put(AUTH_SECRET_KEY, tableProperties.get(Constants.DLC_SECRET_KEY_CONF));
+        properties.put(URL_DEFAULT_DATABASE, tableProperties.get(FlinkCatalogFactory.DEFAULT_DATABASE));
+        properties.put(REWRITE_DB_NAME, tableProperties.get(CATALOG_DATABASE.key()));
+        properties.put(REWRITE_TABLE_NAME, tableProperties.get(CATALOG_TABLE.key()));
+    }
+
+    public void option(String name, String value) {
+        properties.put(name, value);
+    }
+
+    public void options(Map<String, String> newOptions) {
+        properties.putAll(newOptions);
+    }
+
+    public String url() {
+        String jdbcPrefix = "jdbc:dlc:";
+        String endpoint;
+        Map<String, String> urlParams = new HashMap<>();
+        if (properties.get(URL) != null) {
+            String url = properties.get(URL);
+            int splitPoint = url.indexOf("?") == -1 ? url.length() : url.indexOf("?");
+            endpoint = url.substring(jdbcPrefix.length(), splitPoint);
+            Stream.of(url.substring(splitPoint + 1).split("&"))
+                    .forEach(kv -> {
+                        String[] param = kv.split("=");
+                        if (param.length == 2) {
+                            urlParams.put(param[0], param[1]);
+                        }
+                    });
+            Optional.ofNullable(properties.get(COMPACT_RESOUCE_POOL))
+                    .ifPresent(v -> urlParams.put(URL_DATA_RESOURCE_NAME, v));
+        } else {
+            endpoint = properties.getOrDefault(URL_ENDPOINT, URL_ENDPOINT_DEFAULT);
+            urlParams.put(URL_TASK_TYPE, properties.getOrDefault(URL_TASK_TYPE, URL_TASK_TYPE_DEFAULT));
+            urlParams.put(URL_DEFAULT_DATABASE,
+                    properties.getOrDefault(URL_DEFAULT_DATABASE, URL_DEFAULT_DATABASE_DEFAULT));
+            urlParams.put(URL_DATA_SOURCE, properties.getOrDefault(URL_DATA_SOURCE, URL_DATA_SOURCE_DEFAULT));
+            urlParams.put(URL_REGION, properties.getOrDefault(URL_REGION, URL_REGION_DEFAULT));
+            urlParams.put(URL_DATA_RESOURCE_NAME,
+                    properties.getOrDefault(COMPACT_RESOUCE_POOL, COMPACT_RESOUCE_POOL_DEFAULT));
+
+        }
+        List<String> urlParamsList =
+                urlParams.entrySet().stream().map(kv -> kv.getKey() + "=" + kv.getValue()).collect(Collectors.toList());
+        return jdbcPrefix + endpoint + "?" + String.join("&", urlParamsList);
+
+    }
+
+    public String secretId() {
+        return properties.get(AUTH_SECRET_ID);
+    }
+
+    public String secretKey() {
+        return properties.get(AUTH_SECRET_KEY);
+    }
+
+    public String rewriteSql() {
+        String dbName = properties.get(REWRITE_DB_NAME);
+        String tableName = properties.get(REWRITE_TABLE_NAME);
+        Preconditions.checkNotNull(dbName);
+        Preconditions.checkNotNull(tableName);
+        String wholeTableName = String.format("%s.%s", dbName, tableName);
+        String rewriteOptions = String.join(",",
+                ACTION_AUTO_COMPACT_OPTIONS.stream()
+                    .filter(properties::containsKey)
+                    .map(k -> String.format("'%s', '%s'", k.substring(COMPACT_PREFIX.length()), properties.get(k)))
+                    .collect(Collectors.toList()));
+        String rewriteTableSql;
+        if (rewriteOptions.isEmpty()) {
+            rewriteTableSql = String.format(
+                    "CALL `DataLakeCatalog`.`system`.rewrite_data_files (`table` => '%s')",
+                    wholeTableName);
+        } else {
+            rewriteTableSql =
+                    String.format(
+                            "CALL `DataLakeCatalog`.`system`.rewrite_data_files"
+                                    + "(`table` => '%s', options => map(%s))",
+                            wholeTableName, rewriteOptions);
+        }
+        return rewriteTableSql;
+    }
+
+    public int interval() {
+        return PropertyUtil.propertyAsInt(properties, COMPACT_INTERVAL, COMPACT_INTERVAL_DEFAULT);
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/BaseDeltaTaskWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/BaseDeltaTaskWriter.java
new file mode 100644
index 000000000..702353999
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.data.RowDataProjection;
+import org.apache.iceberg.io.BaseTaskWriter;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
+
+    private final Schema schema;
+    private final Schema deleteSchema;
+    private final RowDataWrapper wrapper;
+    private final RowDataWrapper keyWrapper;
+    private final RowDataProjection keyProjection;
+    private final boolean upsert;
+
+    BaseDeltaTaskWriter(PartitionSpec spec,
+            FileFormat format,
+            FileAppenderFactory<RowData> appenderFactory,
+            OutputFileFactory fileFactory,
+            FileIO io,
+            long targetFileSize,
+            Schema schema,
+            RowType flinkSchema,
+            List<Integer> equalityFieldIds,
+            boolean upsert) {
+        super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+        this.schema = schema;
+        this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
+        this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+        this.upsert = upsert;
+        this.keyWrapper =  new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
+        this.keyProjection = RowDataProjection.create(schema, deleteSchema);
+    }
+
+    abstract RowDataDeltaWriter route(RowData row);
+
+    RowDataWrapper wrapper() {
+        return wrapper;
+    }
+
+    @Override
+    public void write(RowData row) throws IOException {
+        RowDataDeltaWriter writer = route(row);
+
+        switch (row.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                if (upsert) {
+                    writer.deleteKey(keyProjection.wrap(row));
+                }
+                writer.write(row);
+                break;
+
+            case UPDATE_BEFORE:
+                if (upsert) {
+                    break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
+                }
+                writer.delete(row);
+                break;
+            case DELETE:
+                writer.delete(row);
+                break;
+
+            default:
+                throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
+        }
+    }
+
+    protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
+        RowDataDeltaWriter(PartitionKey partition) {
+            super(partition, schema, deleteSchema);
+        }
+
+        @Override
+        protected StructLike asStructLike(RowData data) {
+            return wrapper.wrap(data);
+        }
+
+        @Override
+        protected StructLike asStructLikeKey(RowData data) {
+            return keyWrapper.wrap(data);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifests.java
similarity index 95%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifests.java
index 6b52dff8b..d9b21faf2 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifests.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
 
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -25,6 +25,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 import java.util.List;
 
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
 class DeltaManifests {
 
     private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0];
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifestsSerializer.java
similarity index 97%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifestsSerializer.java
index fb2187d76..3d6a4f71f 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifestsSerializer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.iceberg.ManifestFile;
@@ -30,6 +30,9 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
 class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
     private static final int VERSION_1 = 1;
     private static final int VERSION_2 = 2;
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkManifestUtil.java
similarity index 94%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkManifestUtil.java
index 1cbe0d70b..6a444d1d3 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkManifestUtil.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
 
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
@@ -38,6 +38,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.function.Supplier;
 
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
 class FlinkManifestUtil {
     private static final int FORMAT_V2 = 2;
     private static final Long DUMMY_SNAPSHOT_ID = 0L;
@@ -62,10 +65,11 @@ class FlinkManifestUtil {
         }
     }
 
-    static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
-            long attemptNumber) {
+    static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, String operatorUniqueId,
+            int subTaskId, long attemptNumber) {
         TableOperations ops = ((HasTableOperations) table).operations();
-        return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
+        return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, operatorUniqueId,
+                subTaskId, attemptNumber);
     }
 
     static DeltaManifests writeCompletedFiles(WriteResult result,
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
similarity index 78%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
index d86857338..0b48fb169 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
@@ -1,502 +1,532 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.sort.iceberg.sink;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DistributionMode;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.SerializableTable;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
-import org.apache.iceberg.flink.sink.TaskWriterFactory;
-import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
-import org.apache.iceberg.io.WriteResult;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PropertyUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.function.Function;
-
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
-import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
-import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
-import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
-
-public class FlinkSink {
-    private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
-
-    private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
-    private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();
-
-    private FlinkSink() {
-    }
-
-    /**
-     * Initialize a {@link Builder} to export the data from generic input data stream into iceberg table. We use
-     * {@link RowData} inside the sink connector, so users need to provide a mapper function and a
-     * {@link TypeInformation} to convert those generic records to a RowData DataStream.
-     *
-     * @param input      the generic source input data stream.
-     * @param mapper     function to convert the generic data to {@link RowData}
-     * @param outputType to define the {@link TypeInformation} for the input data.
-     * @param <T>        the data type of records.
-     * @return {@link Builder} to connect the iceberg table.
-     */
-    public static <T> Builder builderFor(DataStream<T> input,
-            MapFunction<T, RowData> mapper,
-            TypeInformation<RowData> outputType) {
-        return new Builder().forMapperOutputType(input, mapper, outputType);
-    }
-
-    /**
-     * Initialize a {@link Builder} to export the data from input data stream with {@link Row}s into iceberg table.
-     * We use {@link RowData} inside the sink connector, so users need to provide a {@link TableSchema} for builder to
-     * convert those {@link Row}s to a {@link RowData} DataStream.
-     *
-     * @param input       the source input data stream with {@link Row}s.
-     * @param tableSchema defines the {@link TypeInformation} for input data.
-     * @return {@link Builder} to connect the iceberg table.
-     */
-    public static Builder forRow(DataStream<Row> input, TableSchema tableSchema) {
-        RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
-        DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
-
-        DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
-        return builderFor(input, rowConverter::toInternal, FlinkCompatibilityUtil.toTypeInfo(rowType))
-                .tableSchema(tableSchema);
-    }
-
-    /**
-     * Initialize a {@link Builder} to export the data from input data stream with {@link RowData}s into iceberg table.
-     *
-     * @param input the source input data stream with {@link RowData}s.
-     * @return {@link Builder} to connect the iceberg table.
-     */
-    public static Builder forRowData(DataStream<RowData> input) {
-        return new Builder().forRowData(input);
-    }
-
-    public static class Builder {
-        private Function<String, DataStream<RowData>> inputCreator = null;
-        private TableLoader tableLoader;
-        private Table table;
-        private TableSchema tableSchema;
-        private boolean overwrite = false;
-        private DistributionMode distributionMode = null;
-        private Integer writeParallelism = null;
-        private boolean upsert = false;
-        private List<String> equalityFieldColumns = null;
-        private String uidPrefix = null;
-        private String inlongMetric = null;
-        private String auditHostAndPorts = null;
-
-        private Builder() {
-        }
-
-        private Builder forRowData(DataStream<RowData> newRowDataInput) {
-            this.inputCreator = ignored -> newRowDataInput;
-            return this;
-        }
-
-        private <T> Builder forMapperOutputType(DataStream<T> input,
-                MapFunction<T, RowData> mapper,
-                TypeInformation<RowData> outputType) {
-            this.inputCreator = newUidPrefix -> {
-                if (newUidPrefix != null) {
-                    return input.map(mapper, outputType)
-                            .name(operatorName(newUidPrefix))
-                            .uid(newUidPrefix + "-mapper");
-                } else {
-                    return input.map(mapper, outputType);
-                }
-            };
-            return this;
-        }
-
-        /**
-         * This iceberg {@link Table} instance is used for initializing {@link IcebergStreamWriter} which will write all
-         * the records into {@link DataFile}s and emit them to downstream operator. Providing a table would avoid so
-         * many table loading from each separate task.
-         *
-         * @param newTable the loaded iceberg table instance.
-         * @return {@link Builder} to connect the iceberg table.
-         */
-        public Builder table(Table newTable) {
-            this.table = newTable;
-            return this;
-        }
-
-        /**
-         * The table loader is used for loading tables in {@link IcebergFilesCommitter} lazily, we need this loader
-         * because {@link Table} is not serializable and could not just use the loaded table from Builder#table in the
-         * remote task manager.
-         *
-         * @param newTableLoader to load iceberg table inside tasks.
-         * @return {@link Builder} to connect the iceberg table.
-         */
-        public Builder tableLoader(TableLoader newTableLoader) {
-            this.tableLoader = newTableLoader;
-            return this;
-        }
-
-        public Builder tableSchema(TableSchema newTableSchema) {
-            this.tableSchema = newTableSchema;
-            return this;
-        }
-
-        public Builder overwrite(boolean newOverwrite) {
-            this.overwrite = newOverwrite;
-            return this;
-        }
-
-        /**
-         * Add metric output for iceberg writer
-         * @param inlongMetric
-         * @param auditHostAndPorts
-         * @return
-         */
-        public Builder metric(String inlongMetric, String auditHostAndPorts) {
-            this.inlongMetric = inlongMetric;
-            this.auditHostAndPorts = auditHostAndPorts;
-            return this;
-        }
-
-        /**
-         * Configure the write {@link DistributionMode} that the flink sink will use. Currently, flink support
-         * {@link DistributionMode#NONE} and {@link DistributionMode#HASH}.
-         *
-         * @param mode to specify the write distribution mode.
-         * @return {@link Builder} to connect the iceberg table.
-         */
-        public Builder distributionMode(DistributionMode mode) {
-            Preconditions.checkArgument(!DistributionMode.RANGE.equals(mode),
-                    "Flink does not support 'range' write distribution mode now.");
-            this.distributionMode = mode;
-            return this;
-        }
-
-        /**
-         * Configuring the write parallel number for iceberg stream writer.
-         *
-         * @param newWriteParallelism the number of parallel iceberg stream writer.
-         * @return {@link Builder} to connect the iceberg table.
-         */
-        public Builder writeParallelism(int newWriteParallelism) {
-            this.writeParallelism = newWriteParallelism;
-            return this;
-        }
-
-        /**
-         * All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will
-         * DELETE the old records and then INSERT the new records. In partitioned table, the partition fields should be
-         * a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the
-         * new row that located in partition-B.
-         *
-         * @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
-         * @return {@link Builder} to connect the iceberg table.
-         */
-        public Builder upsert(boolean enabled) {
-            this.upsert = enabled;
-            return this;
-        }
-
-        /**
-         * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
-         *
-         * @param columns defines the iceberg table's key.
-         * @return {@link Builder} to connect the iceberg table.
-         */
-        public Builder equalityFieldColumns(List<String> columns) {
-            this.equalityFieldColumns = columns;
-            return this;
-        }
-
-        /**
-         * Set the uid prefix for FlinkSink operators. Note that FlinkSink internally consists of multiple operators
-         * (like writer, committer, dummy sink etc.) Actually operator uid will be appended with a suffix like
-         * "uidPrefix-writer".
-         * <br><br>
-         * If provided, this prefix is also applied to operator names.
-         * <br><br>
-         * Flink auto generates operator uid if not set explicitly. It is a recommended
-         * <a href="https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/">
-         * best-practice to set uid for all operators</a> before deploying to production. Flink has an option to {@code
-         * pipeline.auto-generate-uid=false} to disable auto-generation and force explicit setting of all operator uid.
-         * <br><br>
-         * Be careful with setting this for an existing job, because now we are changing the operator uid from an
-         * auto-generated one to this new value. When deploying the change with a checkpoint, Flink won't be able to
-         * restore the previous Flink sink operator state (more specifically the committer operator state). You need to
-         * use {@code --allowNonRestoredState} to ignore the previous sink state. During restore Flink sink state is
-         * used to check if last commit was actually successful or not. {@code --allowNonRestoredState} can lead to data
-         * loss if the Iceberg commit failed in the last completed checkpoint.
-         *
-         * @param newPrefix prefix for Flink sink operator uid and name
-         * @return {@link Builder} to connect the iceberg table.
-         */
-        public Builder uidPrefix(String newPrefix) {
-            this.uidPrefix = newPrefix;
-            return this;
-        }
-
-        private <T> DataStreamSink<T> chainIcebergOperators() {
-            Preconditions.checkArgument(inputCreator != null,
-                    "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
-            Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
-
-            DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);
-
-            if (table == null) {
-                tableLoader.open();
-                try (TableLoader loader = tableLoader) {
-                    this.table = loader.loadTable();
-                } catch (IOException e) {
-                    throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
-                }
-            }
-
-            // Convert the requested flink table schema to flink row type.
-            RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
-
-            // Distribute the records from input data stream based on the write.distribution-mode.
-            DataStream<RowData> distributeStream = distributeDataStream(
-                    rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
-
-            // Add parallel writers that append rows to files
-            SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);
-
-            // Add single-parallelism committer that commits files
-            // after successful checkpoint or end of input
-            SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
-
-            // Add dummy discard sink
-            return appendDummySink(committerStream);
-        }
-
-        /**
-         * Append the iceberg sink operators to write records to iceberg table.
-         *
-         * @return {@link DataStreamSink} for sink.
-         * @deprecated this will be removed in 0.14.0; use {@link #append()} because its returned {@link DataStreamSink}
-         *             has a more correct data type.
-         */
-        @Deprecated
-        public DataStreamSink<RowData> build() {
-            return chainIcebergOperators();
-        }
-
-        /**
-         * Append the iceberg sink operators to write records to iceberg table.
-         *
-         * @return {@link DataStreamSink} for sink.
-         */
-        public DataStreamSink<Void> append() {
-            return chainIcebergOperators();
-        }
-
-        private String operatorName(String suffix) {
-            return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
-        }
-
-        @SuppressWarnings("unchecked")
-        private <T> DataStreamSink<T> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
-            DataStreamSink<T> resultStream = committerStream
-                    .addSink(new DiscardingSink())
-                    .name(operatorName(String.format("IcebergSink %s", this.table.name())))
-                    .setParallelism(1);
-            if (uidPrefix != null) {
-                resultStream = resultStream.uid(uidPrefix + "-dummysink");
-            }
-            return resultStream;
-        }
-
-        private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
-            IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
-            SingleOutputStreamOperator<Void> committerStream = writerStream
-                    .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
-                    .setParallelism(1)
-                    .setMaxParallelism(1);
-            if (uidPrefix != null) {
-                committerStream = committerStream.uid(uidPrefix + "-committer");
-            }
-            return committerStream;
-        }
-
-        private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
-            // Find out the equality field id list based on the user-provided equality field column names.
-            List<Integer> equalityFieldIds = Lists.newArrayList();
-            if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
-                for (String column : equalityFieldColumns) {
-                    org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
-                    Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
-                            column, table.schema());
-                    equalityFieldIds.add(field.fieldId());
-                }
-            }
-
-            // Fallback to use upsert mode parsed from table properties if don't specify in job level.
-            boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
-                    UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
-
-            // Validate the equality fields and partition fields if we enable the upsert mode.
-            if (upsertMode) {
-                Preconditions.checkState(!overwrite,
-                        "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
-                Preconditions.checkState(!equalityFieldIds.isEmpty(),
-                        "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
-                if (!table.spec().isUnpartitioned()) {
-                    for (PartitionField partitionField : table.spec().fields()) {
-                        Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
-                                "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
-                                partitionField, equalityFieldColumns);
-                    }
-                }
-            }
-
-            IcebergStreamWriter<RowData> streamWriter = createStreamWriter(
-                    table, flinkRowType, equalityFieldIds, upsertMode, inlongMetric, auditHostAndPorts);
-
-            int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
-            SingleOutputStreamOperator<WriteResult> writerStream = input
-                    .transform(operatorName(ICEBERG_STREAM_WRITER_NAME),
-                            TypeInformation.of(WriteResult.class),
-                            streamWriter)
-                    .setParallelism(parallelism);
-            if (uidPrefix != null) {
-                writerStream = writerStream.uid(uidPrefix + "-writer");
-            }
-            return writerStream;
-        }
-
-        private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
-                Map<String, String> properties,
-                PartitionSpec partitionSpec,
-                Schema iSchema,
-                RowType flinkRowType) {
-            DistributionMode writeMode;
-            if (distributionMode == null) {
-                // Fallback to use distribution mode parsed from table properties if don't specify in job level.
-                String modeName = PropertyUtil.propertyAsString(properties,
-                        WRITE_DISTRIBUTION_MODE,
-                        WRITE_DISTRIBUTION_MODE_NONE);
-
-                writeMode = DistributionMode.fromName(modeName);
-            } else {
-                writeMode = distributionMode;
-            }
-
-            switch (writeMode) {
-                case NONE:
-                    return input;
-
-                case HASH:
-                    if (partitionSpec.isUnpartitioned()) {
-                        return input;
-                    } else {
-                        return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
-                    }
-
-                case RANGE:
-                    LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
-                            WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
-                    return input;
-
-                default:
-                    throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
-            }
-        }
-    }
-
-    static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
-        if (requestedSchema != null) {
-            // Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing iceberg
-            // schema.
-            Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
-            TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
-
-            // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will be promoted
-            // to iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 'byte'),
-            // we will read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here we must
-            // use flink schema.
-            return (RowType) requestedSchema.toRowDataType().getLogicalType();
-        } else {
-            return FlinkSchemaUtil.convert(schema);
-        }
-    }
-
-    static IcebergStreamWriter<RowData> createStreamWriter(Table table,
-            RowType flinkRowType,
-            List<Integer> equalityFieldIds,
-            boolean upsert,
-            String inlongMetric,
-            String auditHostAndPorts) {
-        Preconditions.checkArgument(table != null, "Iceberg table should't be null");
-        Map<String, String> props = table.properties();
-        long targetFileSize = getTargetFileSizeBytes(props);
-        FileFormat fileFormat = getFileFormat(props);
-
-        Table serializableTable = SerializableTable.copyOf(table);
-        TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
-                serializableTable, flinkRowType, targetFileSize,
-                fileFormat, equalityFieldIds, upsert);
-
-        return new IcebergStreamWriter<>(table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts);
-    }
-
-    private static FileFormat getFileFormat(Map<String, String> properties) {
-        String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
-        return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
-    }
-
-    private static long getTargetFileSizeBytes(Map<String, String> properties) {
-        return PropertyUtil.propertyAsLong(properties,
-                WRITE_TARGET_FILE_SIZE_BYTES,
-                WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.sink;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.iceberg.flink.actions.SyncRewriteDataFilesActionOption;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ * Add a table property `write.compact.enable` to support small file compact.
+ * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit.
+ */
+public class FlinkSink {
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
+
+    private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
+    private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();
+
+    private FlinkSink() {
+    }
+
+    /**
+     * Initialize a {@link FlinkSink.Builder} to export the data from generic input data stream into iceberg table.
+     * We use {@link RowData} inside the sink connector, so users need to provide a mapper function and a
+     * {@link TypeInformation} to convert those generic records to a RowData DataStream.
+     *
+     * @param input      the generic source input data stream.
+     * @param mapper     function to convert the generic data to {@link RowData}
+     * @param outputType to define the {@link TypeInformation} for the input data.
+     * @param <T>        the data type of records.
+     * @return {@link FlinkSink.Builder} to connect the iceberg table.
+     */
+    public static <T> FlinkSink.Builder builderFor(DataStream<T> input,
+            MapFunction<T, RowData> mapper,
+            TypeInformation<RowData> outputType) {
+        return new FlinkSink.Builder().forMapperOutputType(input, mapper, outputType);
+    }
+
+    /**
+     * Initialize a {@link FlinkSink.Builder} to export the data from input data stream with {@link Row}s into iceberg
+     * table. We use {@link RowData} inside the sink connector, so users need to provide a {@link TableSchema} for
+     * builder to convert those {@link Row}s to a {@link RowData} DataStream.
+     *
+     * @param input       the source input data stream with {@link Row}s.
+     * @param tableSchema defines the {@link TypeInformation} for input data.
+     * @return {@link FlinkSink.Builder} to connect the iceberg table.
+     */
+    public static FlinkSink.Builder forRow(DataStream<Row> input, TableSchema tableSchema) {
+        RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
+        DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
+
+        DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
+        return builderFor(input, rowConverter::toInternal, FlinkCompatibilityUtil.toTypeInfo(rowType))
+                .tableSchema(tableSchema);
+    }
+
+    /**
+     * Initialize a {@link FlinkSink.Builder} to export the data from input data stream with {@link RowData}s into
+     * iceberg table.
+     *
+     * @param input the source input data stream with {@link RowData}s.
+     * @return {@link FlinkSink.Builder} to connect the iceberg table.
+     */
+    public static FlinkSink.Builder forRowData(DataStream<RowData> input) {
+        return new FlinkSink.Builder().forRowData(input);
+    }
+
+    public static class Builder {
+        private Function<String, DataStream<RowData>> inputCreator = null;
+        private TableLoader tableLoader;
+        private Table table;
+        private TableSchema tableSchema;
+        private SyncRewriteDataFilesActionOption compact;
+        private boolean overwrite = false;
+        private boolean appendMode = false;
+        private DistributionMode distributionMode = null;
+        private Integer writeParallelism = null;
+        private boolean upsert = false;
+        private List<String> equalityFieldColumns = null;
+        private String uidPrefix = null;
+        private MetricOption metricOption = null;
+
+        private Builder() {
+        }
+
+        private FlinkSink.Builder forRowData(DataStream<RowData> newRowDataInput) {
+            this.inputCreator = ignored -> newRowDataInput;
+            return this;
+        }
+
+        private <T> FlinkSink.Builder forMapperOutputType(DataStream<T> input,
+                MapFunction<T, RowData> mapper,
+                TypeInformation<RowData> outputType) {
+            this.inputCreator = newUidPrefix -> {
+                if (newUidPrefix != null) {
+                    return input.map(mapper, outputType)
+                            .name(operatorName(newUidPrefix))
+                            .uid(newUidPrefix + "-mapper");
+                } else {
+                    return input.map(mapper, outputType);
+                }
+            };
+            return this;
+        }
+
+        /**
+         * This iceberg {@link Table} instance is used for initializing {@link IcebergStreamWriter} which will write all
+         * the records into {@link DataFile}s and emit them to downstream operator. Providing a table would avoid so
+         * many table loading from each separate task.
+         *
+         * @param newTable the loaded iceberg table instance.
+         * @return {@link FlinkSink.Builder} to connect the iceberg table.
+         */
+        public FlinkSink.Builder table(Table newTable) {
+            this.table = newTable;
+            return this;
+        }
+
+        /**
+         * The table loader is used for loading tables in {@link IcebergFilesCommitter} lazily, we need this loader
+         * because {@link Table} is not serializable and could not just use the loaded table from Builder#table in the
+         * remote task manager.
+         *
+         * @param newTableLoader to load iceberg table inside tasks.
+         * @return {@link FlinkSink.Builder} to connect the iceberg table.
+         */
+        public FlinkSink.Builder tableLoader(TableLoader newTableLoader) {
+            this.tableLoader = newTableLoader;
+            return this;
+        }
+
+        public FlinkSink.Builder tableSchema(TableSchema newTableSchema) {
+            this.tableSchema = newTableSchema;
+            return this;
+        }
+
+        public FlinkSink.Builder overwrite(boolean newOverwrite) {
+            this.overwrite = newOverwrite;
+            return this;
+        }
+
+        /**
+         * The appendMode properties is used to insert data without equality field columns.
+         *
+         * @param appendMode append mode properties.
+         * @return {@link FlinkSink.Builder} to connect the iceberg table.
+         */
+        public FlinkSink.Builder appendMode(boolean appendMode) {
+            this.appendMode = appendMode;
+            return this;
+        }
+
+        /**
+         * The compact properties is used to compact small files produced by checkpoint.
+         *
+         * @param compact auto compact properties.
+         * @return {@link FlinkSink.Builder} to connect the iceberg table.
+         */
+        public FlinkSink.Builder compact(SyncRewriteDataFilesActionOption compact) {
+            this.compact = compact;
+            return this;
+        }
+
+        /**
+         * Add metric output for iceberg writer
+         * @param metricOption
+         * @return
+         */
+        public Builder metric(MetricOption metricOption) {
+            this.metricOption = metricOption;
+            return this;
+        }
+
+        /**
+         * Configure the write {@link DistributionMode} that the flink sink will use. Currently, flink support
+         * {@link DistributionMode#NONE} and {@link DistributionMode#HASH}.
+         *
+         * @param mode to specify the write distribution mode.
+         * @return {@link FlinkSink.Builder} to connect the iceberg table.
+         */
+        public FlinkSink.Builder distributionMode(DistributionMode mode) {
+            Preconditions.checkArgument(!DistributionMode.RANGE.equals(mode),
+                    "Flink does not support 'range' write distribution mode now.");
+            this.distributionMode = mode;
+            return this;
+        }
+
+        /**
+         * Configuring the write parallel number for iceberg stream writer.
+         *
+         * @param newWriteParallelism the number of parallel iceberg stream writer.
+         * @return {@link FlinkSink.Builder} to connect the iceberg table.
+         */
+        public FlinkSink.Builder writeParallelism(int newWriteParallelism) {
+            this.writeParallelism = newWriteParallelism;
+            return this;
+        }
+
+        /**
+         * All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will
+         * DELETE the old records and then INSERT the new records. In partitioned table, the partition fields should be
+         * a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the
+         * new row that located in partition-B.
+         *
+         * @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
+         * @return {@link FlinkSink.Builder} to connect the iceberg table.
+         */
+        public FlinkSink.Builder upsert(boolean enabled) {
+            this.upsert = enabled;
+            return this;
+        }
+
+        /**
+         * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
+         *
+         * @param columns defines the iceberg table's key.
+         * @return {@link FlinkSink.Builder} to connect the iceberg table.
+         */
+        public FlinkSink.Builder equalityFieldColumns(List<String> columns) {
+            this.equalityFieldColumns = columns;
+            return this;
+        }
+
+        /**
+         * Set the uid prefix for FlinkSink operators. Note that FlinkSink internally consists of multiple operators
+         * (like writer, committer, dummy sink etc.) Actually operator uid will be appended with a suffix like
+         * "uidPrefix-writer".
+         * <br><br>
+         * If provided, this prefix is also applied to operator names.
+         * <br><br>
+         * Flink auto generates operator uid if not set explicitly. It is a recommended
+         * <a href="https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/">
+         * best-practice to set uid for all operators</a> before deploying to production. Flink has an option to {@code
+         * pipeline.auto-generate-uid=false} to disable auto-generation and force explicit setting of all operator uid.
+         * <br><br>
+         * Be careful with setting this for an existing job, because now we are changing the operator uid from an
+         * auto-generated one to this new value. When deploying the change with a checkpoint, Flink won't be able to
+         * restore the previous Flink sink operator state (more specifically the committer operator state). You need to
+         * use {@code --allowNonRestoredState} to ignore the previous sink state. During restore Flink sink state is
+         * used to check iflast commit was actually successful or not. {@code --allowNonRestoredState} can lead to data
+         * loss if the Iceberg commit failed in the last completed checkpoint.
+         *
+         * @param newPrefix prefix for Flink sink operator uid and name
+         * @return {@link FlinkSink.Builder} to connect the iceberg table.
+         */
+        public FlinkSink.Builder uidPrefix(String newPrefix) {
+            this.uidPrefix = newPrefix;
+            return this;
+        }
+
+        private <T> DataStreamSink<T> chainIcebergOperators() {
+            Preconditions.checkArgument(inputCreator != null,
+                    "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
+            Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
+
+            DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);
+
+            if (table == null) {
+                tableLoader.open();
+                try (TableLoader loader = tableLoader) {
+                    this.table = loader.loadTable();
+                } catch (IOException e) {
+                    throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
+                }
+            }
+
+            // Convert the requested flink table schema to flink row type.
+            RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+
+            // Distribute the records from input data stream based on the write.distribution-mode.
+            DataStream<RowData> distributeStream = distributeDataStream(
+                    rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
+
+            // Add parallel writers that append rows to files
+            SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);
+
+            // Add single-parallelism committer that commits files
+            // after successful checkpoint or end of input
+            SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
+
+            // Add dummy discard sink
+            return appendDummySink(committerStream);
+        }
+
+        /**
+         * Append the iceberg sink operators to write records to iceberg table.
+         *
+         * @return {@link DataStreamSink} for sink.
+         * @deprecated this will be removed in 0.14.0; use {@link #append()} because its returned {@link DataStreamSink}
+         *             has a more correct data type.
+         */
+        @Deprecated
+        public DataStreamSink<RowData> build() {
+            return chainIcebergOperators();
+        }
+
+        /**
+         * Append the iceberg sink operators to write records to iceberg table.
+         *
+         * @return {@link DataStreamSink} for sink.
+         */
+        public DataStreamSink<Void> append() {
+            return chainIcebergOperators();
+        }
+
+        private String operatorName(String suffix) {
+            return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+        }
+
+        @SuppressWarnings("unchecked")
+        private <T> DataStreamSink<T> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
+            DataStreamSink<T> resultStream = committerStream
+                    .addSink(new DiscardingSink())
+                    .name(operatorName(String.format("IcebergSink %s", this.table.name())))
+                    .setParallelism(1);
+            if (uidPrefix != null) {
+                resultStream = resultStream.uid(uidPrefix + "-dummysink");
+            }
+            return resultStream;
+        }
+
+        private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
+            IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite, compact);
+            SingleOutputStreamOperator<Void> committerStream = writerStream
+                    .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
+                    .setParallelism(1)
+                    .setMaxParallelism(1);
+            if (uidPrefix != null) {
+                committerStream = committerStream.uid(uidPrefix + "-committer");
+            }
+            return committerStream;
+        }
+
+        private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
+            // Find out the equality field id list based on the user-provided equality field column names.
+            List<Integer> equalityFieldIds = Lists.newArrayList();
+            if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
+                for (String column : equalityFieldColumns) {
+                    org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
+                    Preconditions.checkNotNull(field, "Missing required equality field column '%s' "
+                            + "in table schema %s", column, table.schema());
+                    equalityFieldIds.add(field.fieldId());
+                }
+            }
+
+            // Fallback to use upsert mode parsed from table properties if don't specify in job level.
+            // Only if not appendMode, upsert can be valid.
+            boolean upsertMode = (upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+                    UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT)) && !appendMode;
+
+            // Validate the equality fields and partition fields if we enable the upsert mode.
+            if (upsertMode) {
+                Preconditions.checkState(!overwrite,
+                        "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
+                Preconditions.checkState(!equalityFieldIds.isEmpty(),
+                        "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+                if (!table.spec().isUnpartitioned()) {
+                    for (PartitionField partitionField : table.spec().fields()) {
+                        Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
+                                "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
+                                partitionField, equalityFieldColumns);
+                    }
+                }
+            }
+
+            IcebergStreamWriter<RowData> streamWriter =
+                    createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode, appendMode, metricOption);
+
+            int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
+            SingleOutputStreamOperator<WriteResult> writerStream = input
+                    .transform(operatorName(ICEBERG_STREAM_WRITER_NAME),
+                            TypeInformation.of(WriteResult.class),
+                            streamWriter)
+                    .setParallelism(parallelism);
+            if (uidPrefix != null) {
+                writerStream = writerStream.uid(uidPrefix + "-writer");
+            }
+            return writerStream;
+        }
+
+        private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
+                Map<String, String> properties,
+                PartitionSpec partitionSpec,
+                Schema iSchema,
+                RowType flinkRowType) {
+            DistributionMode writeMode;
+            if (distributionMode == null) {
+                // Fallback to use distribution mode parsed from table properties if don't specify in job level.
+                String modeName = PropertyUtil.propertyAsString(properties,
+                        WRITE_DISTRIBUTION_MODE,
+                        WRITE_DISTRIBUTION_MODE_NONE);
+
+                writeMode = DistributionMode.fromName(modeName);
+            } else {
+                writeMode = distributionMode;
+            }
+
+            switch (writeMode) {
+                case NONE:
+                    return input;
+
+                case HASH:
+                    if (partitionSpec.isUnpartitioned()) {
+                        return input;
+                    } else {
+                        return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+                    }
+
+                case RANGE:
+                    LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
+                            WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
+                    return input;
+
+                default:
+                    throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
+            }
+        }
+    }
+
+    static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
+        if (requestedSchema != null) {
+            // Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing
+            // iceberg schema.
+            Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
+            TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
+
+            // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will be promoted
+            // to iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 'byte'),
+            // we will read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here we must
+            // use flink schema.
+            return (RowType) requestedSchema.toRowDataType().getLogicalType();
+        } else {
+            return FlinkSchemaUtil.convert(schema);
+        }
+    }
+
+    static IcebergStreamWriter<RowData> createStreamWriter(Table table,
+            RowType flinkRowType,
+            List<Integer> equalityFieldIds,
+            boolean upsert,
+            boolean appendMode,
+            MetricOption metricOption) {
+        Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+        Map<String, String> props = table.properties();
+        long targetFileSize = getTargetFileSizeBytes(props);
+        FileFormat fileFormat = getFileFormat(props);
+
+        Table serializableTable = SerializableTable.copyOf(table);
+        TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
+                serializableTable, flinkRowType, targetFileSize,
+                fileFormat, equalityFieldIds, upsert, appendMode);
+
+        return new IcebergStreamWriter<>(table.name(), taskWriterFactory, metricOption);
+    }
+
+    private static FileFormat getFileFormat(Map<String, String> properties) {
+        String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+        return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+    }
+
+    private static long getTargetFileSizeBytes(Map<String, String> properties) {
+        return PropertyUtil.propertyAsLong(properties,
+                WRITE_TARGET_FILE_SIZE_BYTES,
+                WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergFilesCommitter.java
similarity index 81%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergFilesCommitter.java
index 8e5315cad..2ee5ccf7f 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
 
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -48,6 +48,10 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.flink.CompactTableProperties;
+import org.apache.inlong.sort.iceberg.flink.actions.RewriteResult;
+import org.apache.inlong.sort.iceberg.flink.actions.SyncRewriteDataFilesAction;
+import org.apache.inlong.sort.iceberg.flink.actions.SyncRewriteDataFilesActionOption;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,8 +61,16 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Optional;
 import java.util.SortedMap;
 
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_ENABLED;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_ENABLED_DEFAULT;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add small file compact action.
+ */
 class IcebergFilesCommitter extends AbstractStreamOperator<Void>
         implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {
 
@@ -69,9 +81,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
     private static final String FLINK_JOB_ID = "flink.job-id";
 
-    // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we
-    // could correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg
-    // table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
+    // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing,
+    // so we could correctly commit all the data files whose checkpoint id is greater than the max committed one to
+    // iceberg table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
     // committing the iceberg transaction.
     private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
     static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
@@ -110,9 +122,17 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor();
     private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
 
-    IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) {
+    // compact file action
+    private SyncRewriteDataFilesActionOption compactOption;
+    private transient SyncRewriteDataFilesAction compactAction;
+
+    IcebergFilesCommitter(
+            TableLoader tableLoader,
+            boolean replacePartitions,
+            SyncRewriteDataFilesActionOption compactOption) {
         this.tableLoader = tableLoader;
         this.replacePartitions = replacePartitions;
+        this.compactOption = compactOption;
     }
 
     @Override
@@ -124,14 +144,24 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
         this.tableLoader.open();
         this.table = tableLoader.loadTable();
 
-        maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
+        // compact file
+        if (PropertyUtil.propertyAsBoolean(table.properties(), COMPACT_ENABLED, COMPACT_ENABLED_DEFAULT)) {
+            compactAction = new SyncRewriteDataFilesAction(compactOption);
+            CompactTableProperties.TABLE_AUTO_COMPACT_PROPERTIES.stream()
+                    .forEach(k -> Optional.ofNullable(table.properties().get(k))
+                                        .ifPresent(v -> compactAction.option(k, v)));
+        }
+
+        maxContinuousEmptyCommits =
+                PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
         Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
                 MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
 
         int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
         int attemptId = getRuntimeContext().getAttemptNumber();
-        this.manifestOutputFileFactory = FlinkManifestUtil
-                .createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
+        String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
+        this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId,
+                subTaskId, attemptId);
         this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
         this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
@@ -141,9 +171,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
             Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
                     "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
 
-            // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job even
-            // if it's restored from a snapshot created by another different flink job, so it's safe to assign the max
-            // committed checkpoint id from restored flink job to the current flink job.
+            // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job
+            // even if it's restored from a snapshot created by another different flink job, so it's safe to assign
+            // the max committed checkpoint id from restored flink job to the current flink job.
             this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
 
             NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
@@ -189,6 +219,11 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
         if (checkpointId > maxCommittedCheckpointId) {
             commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
             this.maxCommittedCheckpointId = checkpointId;
+            // every interval checkpoint do a small file compact
+            if (compactAction != null) {
+                RewriteResult result = compactAction.execute();
+                LOG.info("compact action result: {}", result);
+            }
         }
     }
 
@@ -234,8 +269,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
                         .add("checkpointId", checkpointId)
                         .add("manifestPath", manifest.path())
                         .toString();
-                LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink "
-                                + "manifests: {}", details, e);
+                LOG.warn("The iceberg transaction has been committed, but we failed to clean the "
+                                + "temporary flink manifests: {}", details, e);
             }
         }
     }
@@ -244,21 +279,23 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
             long checkpointId) {
         // Partition overwrite does not support delete files.
         int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
-        Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files.");
+        Preconditions.checkState(deleteFilesNum == 0,
+                "Cannot overwrite partitions with delete files.");
 
         // Commit the overwrite transaction.
         ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
 
         int numFiles = 0;
         for (WriteResult result : pendingResults.values()) {
-            Preconditions.checkState(result.referencedDataFiles().length == 0,
-                    "Should have no referenced data files.");
+            Preconditions.checkState(
+                    result.referencedDataFiles().length == 0, "Should have no referenced data files.");
 
             numFiles += result.dataFiles().length;
             Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
         }
 
-        commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
+        commitOperation(dynamicOverwrite, numFiles, 0,
+                "dynamic partition overwrite", newFlinkJobId, checkpointId);
     }
 
     private void commitDeltaTxn(
@@ -271,8 +308,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
 
             int numFiles = 0;
             for (WriteResult result : pendingResults.values()) {
-                Preconditions.checkState(result.referencedDataFiles().length == 0,
-                        "Should have no referenced data files.");
+                Preconditions.checkState(
+                        result.referencedDataFiles().length == 0, "Should have no referenced data files.");
 
                 numFiles += result.dataFiles().length;
                 Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
@@ -288,11 +325,11 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
                 WriteResult result = e.getValue();
 
                 // Row delta validations are not needed for streaming changes that write equality deletes. Equality
-                // deletes are applied to data in all previous sequence numbers, so retries may push deletes further in
-                // the future, but do not affect correctness. Position deletes committed to the table in this path are
-                // used only to delete rows from data files that are being added in this commit. There is no way for
-                // data files added along with the delete files to be concurrently removed, so there is no need to
-                // validate the files referenced by the position delete files that are being committed.
+                // deletes are applied to data in all previous sequence numbers, so retries may push deletes further
+                // in the future, but do not affect correctness. Position deletes committed to the table in this path
+                // are used only to delete rows from data files that are being added in this commit. There is no way
+                // for data files added along with the delete files to be concurrently removed, so there is no need
+                // to validate the files referenced by the position delete files that are being committed.
                 RowDelta rowDelta = table.newRowDelta();
 
                 int numDataFiles = result.dataFiles().length;
@@ -335,8 +372,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     }
 
     /**
-     * Write all the complete data files to a newly created manifest file and return the manifest's avro serialized
-     * bytes.
+     * Write all the complete data files to a newly created manifest file and return the manifest's avro
+     * serialized bytes.
      */
     private byte[] writeToManifest(long checkpointId) throws IOException {
         if (writeResultsOfCurrentCkpt.isEmpty()) {
@@ -351,7 +388,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     }
 
     @Override
-    public void dispose() throws Exception {
+    public void close() throws Exception {
         if (tableLoader != null) {
             tableLoader.close();
         }
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
similarity index 82%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
index 75eca46c5..9f7cffbcf 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
 
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
@@ -28,14 +28,16 @@ import org.apache.iceberg.flink.sink.TaskWriterFactory;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
 class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
         implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {
 
@@ -43,8 +45,7 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
 
     private final String fullTableName;
     private final TaskWriterFactory<T> taskWriterFactory;
-    private final String inlongMetric;
-    private final String auditHostAndPorts;
+    private final MetricOption metricOption;
 
     private transient TaskWriter<T> writer;
     private transient int subTaskId;
@@ -52,15 +53,12 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
     @Nullable
     private transient SinkMetricData metricData;
 
-    IcebergStreamWriter(
-            String fullTableName,
+    IcebergStreamWriter(String fullTableName,
             TaskWriterFactory<T> taskWriterFactory,
-            String inlongMetric,
-            String auditHostAndPorts) {
+            @Nullable MetricOption metricOption) {
         this.fullTableName = fullTableName;
         this.taskWriterFactory = taskWriterFactory;
-        this.inlongMetric = inlongMetric;
-        this.auditHostAndPorts = auditHostAndPorts;
+        this.metricOption = metricOption;
         setChainingStrategy(ChainingStrategy.ALWAYS);
     }
 
@@ -76,13 +74,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
         this.writer = taskWriterFactory.create();
 
         // Initialize metric
-        if (inlongMetric != null) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            String inlongGroupId = inlongMetricArray[0];
-            String inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            metricData = new SinkMetricData(
-                    inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
             metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
             metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
             metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
@@ -110,8 +103,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
     }
 
     @Override
-    public void dispose() throws Exception {
-        super.dispose();
+    public void close() throws Exception {
+        super.close();
         if (writer != null) {
             writer.close();
             writer = null;
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/ManifestOutputFileFactory.java
similarity index 84%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/ManifestOutputFileFactory.java
index f13ff4237..e01fd4d0d 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/ManifestOutputFileFactory.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
 
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.TableOperations;
@@ -28,6 +28,9 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
 class ManifestOutputFileFactory {
     // Users could define their own flink manifests directory by setting this value in table properties.
     static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location";
@@ -36,23 +39,25 @@ class ManifestOutputFileFactory {
     private final FileIO io;
     private final Map<String, String> props;
     private final String flinkJobId;
+    private final String operatorUniqueId;
     private final int subTaskId;
     private final long attemptNumber;
     private final AtomicInteger fileCount = new AtomicInteger(0);
 
     ManifestOutputFileFactory(TableOperations ops, FileIO io, Map<String, String> props,
-            String flinkJobId, int subTaskId, long attemptNumber) {
+            String flinkJobId,  String operatorUniqueId, int subTaskId, long attemptNumber) {
         this.ops = ops;
         this.io = io;
         this.props = props;
         this.flinkJobId = flinkJobId;
+        this.operatorUniqueId = operatorUniqueId;
         this.subTaskId = subTaskId;
         this.attemptNumber = attemptNumber;
     }
 
     private String generatePath(long checkpointId) {
-        return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId,
-                attemptNumber, checkpointId, fileCount.incrementAndGet()));
+        return FileFormat.AVRO.addExtension(String.format("%s-%s-%05d-%d-%d-%05d", flinkJobId, operatorUniqueId,
+                subTaskId, attemptNumber, checkpointId, fileCount.incrementAndGet()));
     }
 
     OutputFile create(long checkpointId) {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionKeySelector.java
similarity index 95%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionKeySelector.java
index 1c2cf61b1..bc4286bbb 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionKeySelector.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.table.data.RowData;
@@ -31,6 +31,8 @@ import org.apache.iceberg.flink.sink.FlinkSink;
 /**
  * Create a {@link KeySelector} to shuffle by partition key, then each partition/bucket will be wrote by only one
  * task. That will reduce lots of small files in partitioned fanout write policy for {@link FlinkSink}.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
 class PartitionKeySelector implements KeySelector<RowData, String> {
 
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionedDeltaWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionedDeltaWriter.java
new file mode 100644
index 000000000..a482e0ba2
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionedDeltaWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
+
+  private final PartitionKey partitionKey;
+
+  private final Map<PartitionKey, RowDataDeltaWriter> writers = Maps.newHashMap();
+
+  PartitionedDeltaWriter(PartitionSpec spec,
+                         FileFormat format,
+                         FileAppenderFactory<RowData> appenderFactory,
+                         OutputFileFactory fileFactory,
+                         FileIO io,
+                         long targetFileSize,
+                         Schema schema,
+                         RowType flinkSchema,
+                         List<Integer> equalityFieldIds,
+                         boolean upsert) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+        upsert);
+    this.partitionKey = new PartitionKey(spec, schema);
+  }
+
+  @Override
+  RowDataDeltaWriter route(RowData row) {
+    partitionKey.partition(wrapper().wrap(row));
+
+    RowDataDeltaWriter writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RowDataDeltaWriter(copiedKey);
+      writers.put(copiedKey, writer);
+    }
+
+    return writer;
+  }
+
+  @Override
+  public void close() {
+    try {
+      Tasks.foreach(writers.values())
+          .throwFailureWhenFinished()
+          .noRetry()
+          .run(RowDataDeltaWriter::close, IOException.class);
+
+      writers.clear();
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to close equality delta writer", e);
+    }
+  }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
new file mode 100644
index 000000000..36240760f
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitionedFanoutWriter;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ArrayUtil;
+
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add an option `sink.ignore.changelog` to support insert-only mode without equalityFieldIds.
+ */
+public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
+    private final Table table;
+    private final Schema schema;
+    private final RowType flinkSchema;
+    private final PartitionSpec spec;
+    private final FileIO io;
+    private final long targetFileSizeBytes;
+    private final FileFormat format;
+    private final List<Integer> equalityFieldIds;
+    private final boolean upsert;
+    private final boolean appendMode;
+    private final FileAppenderFactory<RowData> appenderFactory;
+
+    private transient OutputFileFactory outputFileFactory;
+
+    public RowDataTaskWriterFactory(Table table,
+            RowType flinkSchema,
+            long targetFileSizeBytes,
+            FileFormat format,
+            List<Integer> equalityFieldIds,
+            boolean upsert,
+            boolean appendMode) {
+        this.table = table;
+        this.schema = table.schema();
+        this.flinkSchema = flinkSchema;
+        this.spec = table.spec();
+        this.io = table.io();
+        this.targetFileSizeBytes = targetFileSizeBytes;
+        this.format = format;
+        this.equalityFieldIds = equalityFieldIds;
+        this.upsert = upsert;
+        this.appendMode = appendMode;
+
+        if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
+            this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
+        } else {
+            // TODO provide the ability to customize the equality-delete row schema.
+            this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
+                    ArrayUtil.toIntArray(equalityFieldIds), schema, null);
+        }
+    }
+
+    @Override
+    public void initialize(int taskId, int attemptId) {
+        this.outputFileFactory = OutputFileFactory.builderFor(table, taskId, attemptId).build();
+    }
+
+    @Override
+    public TaskWriter<RowData> create() {
+        Preconditions.checkNotNull(outputFileFactory,
+                "The outputFileFactory shouldn't be null if we have invoked the initialize().");
+
+        if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
+            // Initialize a task writer to write INSERT only.
+            if (spec.isUnpartitioned()) {
+                return new UnpartitionedWriter<>(
+                        spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes);
+            } else {
+                return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
+                        io, targetFileSizeBytes, schema, flinkSchema);
+            }
+        } else {
+            // Initialize a task writer to write both INSERT and equality DELETE.
+            if (spec.isUnpartitioned()) {
+                return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
+                        targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+            } else {
+                return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
+                        targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+            }
+        }
+    }
+
+    private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter<RowData> {
+
+        private final PartitionKey partitionKey;
+        private final RowDataWrapper rowDataWrapper;
+
+        RowDataPartitionedFanoutWriter(
+                PartitionSpec spec, FileFormat format, FileAppenderFactory<RowData> appenderFactory,
+                OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema,
+                RowType flinkSchema) {
+            super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+            this.partitionKey = new PartitionKey(spec, schema);
+            this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+        }
+
+        @Override
+        protected PartitionKey partition(RowData row) {
+            partitionKey.partition(rowDataWrapper.wrap(row));
+            return partitionKey;
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/UnpartitionedDeltaWriter.java
new file mode 100644
index 000000000..9e8e53999
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/UnpartitionedDeltaWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
+  private final RowDataDeltaWriter writer;
+
+  UnpartitionedDeltaWriter(PartitionSpec spec,
+                           FileFormat format,
+                           FileAppenderFactory<RowData> appenderFactory,
+                           OutputFileFactory fileFactory,
+                           FileIO io,
+                           long targetFileSize,
+                           Schema schema,
+                           RowType flinkSchema,
+                           List<Integer> equalityFieldIds,
+                           boolean upsert) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+        upsert);
+    this.writer = new RowDataDeltaWriter(null);
+  }
+
+  @Override
+  RowDataDeltaWriter route(RowData row) {
+    return writer;
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/iceberg-dlc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..db2caa084
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.iceberg.flink.FlinkDynamicTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
index 2056a8d02..e81a5552d 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
@@ -89,6 +89,8 @@ import java.util.stream.Collectors;
  * </p>
  * The Iceberg table manages its partitions by itself. The partition of the Iceberg table is independent of the
  * partition of Flink.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
 public class FlinkCatalog extends AbstractCatalog {
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
index f4bb7d49f..6a02dc81e 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
@@ -57,6 +57,8 @@ import java.util.Map;
  * </p>
  * To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override
  * {@link #createCatalogLoader(String, Map, Configuration)}.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
 public class FlinkCatalogFactory implements CatalogFactory {
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index e10626148..f12bd1b56 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -52,7 +52,7 @@ import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
 
 /**
- * Copy from org.apache.iceberg.flink:iceberg-flink-runtime-1.13:0.13.1
+ * Copy from org.apache.iceberg.flink:iceberg-flink-runtime-1.13:0.13.2
  *
  * <p>
  * Factory for creating configured instances of {@link IcebergTableSource} and {@link
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index b5f3bef9e..088622278 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -37,11 +37,17 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
 
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit
+ */
 public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
 
     private static final Logger LOG = LoggerFactory.getLogger(IcebergTableSink.class);
@@ -80,6 +86,9 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
                 .tableSchema(tableSchema)
                 .equalityFieldColumns(equalityColumns)
                 .overwrite(overwrite)
+                .appendMode(Boolean.valueOf(
+                        Optional.ofNullable(catalogTable.getOptions().get(ICEBERG_IGNORE_ALL_CHANGELOG.key()))
+                                .orElse(ICEBERG_IGNORE_ALL_CHANGELOG.defaultValue().toString())))
                 .metric(catalogTable.getOptions().getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()),
                         catalogTable.getOptions().getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()))
                 .append();
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/BaseDeltaTaskWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/BaseDeltaTaskWriter.java
new file mode 100644
index 000000000..40b174cd8
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/BaseDeltaTaskWriter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.data.RowDataProjection;
+import org.apache.iceberg.io.BaseTaskWriter;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
+
+    private final Schema schema;
+    private final Schema deleteSchema;
+    private final RowDataWrapper wrapper;
+    private final RowDataWrapper keyWrapper;
+    private final RowDataProjection keyProjection;
+    private final boolean upsert;
+
+    BaseDeltaTaskWriter(PartitionSpec spec,
+            FileFormat format,
+            FileAppenderFactory<RowData> appenderFactory,
+            OutputFileFactory fileFactory,
+            FileIO io,
+            long targetFileSize,
+            Schema schema,
+            RowType flinkSchema,
+            List<Integer> equalityFieldIds,
+            boolean upsert) {
+        super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+        this.schema = schema;
+        this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
+        this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+        this.upsert = upsert;
+        this.keyWrapper =  new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
+        this.keyProjection = RowDataProjection.create(schema, deleteSchema);
+    }
+
+    abstract RowDataDeltaWriter route(RowData row);
+
+    RowDataWrapper wrapper() {
+        return wrapper;
+    }
+
+    @Override
+    public void write(RowData row) throws IOException {
+        RowDataDeltaWriter writer = route(row);
+
+        switch (row.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                if (upsert) {
+                    writer.deleteKey(keyProjection.wrap(row));
+                }
+                writer.write(row);
+                break;
+
+            case UPDATE_BEFORE:
+                if (upsert) {
+                    break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
+                }
+                writer.delete(row);
+                break;
+            case DELETE:
+                writer.delete(row);
+                break;
+
+            default:
+                throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
+        }
+    }
+
+    protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
+        RowDataDeltaWriter(PartitionKey partition) {
+            super(partition, schema, deleteSchema);
+        }
+
+        @Override
+        protected StructLike asStructLike(RowData data) {
+            return wrapper.wrap(data);
+        }
+
+        @Override
+        protected StructLike asStructLikeKey(RowData data) {
+            return keyWrapper.wrap(data);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
index 6b52dff8b..dae992042 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
@@ -25,6 +25,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 import java.util.List;
 
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
 class DeltaManifests {
 
     private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0];
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
index fb2187d76..1ae2bcbe5 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
@@ -30,6 +30,9 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
 class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
     private static final int VERSION_1 = 1;
     private static final int VERSION_2 = 2;
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
index 1cbe0d70b..46d7beec9 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
@@ -38,6 +38,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.function.Supplier;
 
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
 class FlinkManifestUtil {
     private static final int FORMAT_V2 = 2;
     private static final Long DUMMY_SNAPSHOT_ID = 0L;
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index d86857338..65929bc34 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -42,7 +42,6 @@ import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
 import org.apache.iceberg.flink.sink.TaskWriterFactory;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.WriteResult;
@@ -69,6 +68,11 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
 
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit.
+ */
 public class FlinkSink {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
 
@@ -129,6 +133,7 @@ public class FlinkSink {
         private Table table;
         private TableSchema tableSchema;
         private boolean overwrite = false;
+        private boolean appendMode = false;
         private DistributionMode distributionMode = null;
         private Integer writeParallelism = null;
         private boolean upsert = false;
@@ -196,6 +201,17 @@ public class FlinkSink {
             return this;
         }
 
+        /**
+         * The appendMode properties is used to insert data without equality field columns.
+         *
+         * @param appendMode append mode properties.
+         * @return {@link FlinkSink.Builder} to connect the iceberg table.
+         */
+        public FlinkSink.Builder appendMode(boolean appendMode) {
+            this.appendMode = appendMode;
+            return this;
+        }
+
         /**
          * Add metric output for iceberg writer
          * @param inlongMetric
@@ -381,8 +397,9 @@ public class FlinkSink {
             }
 
             // Fallback to use upsert mode parsed from table properties if don't specify in job level.
-            boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
-                    UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
+            // Only if not appendMode, upsert can be valid.
+            boolean upsertMode = (upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+                    UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT)) && !appendMode;
 
             // Validate the equality fields and partition fields if we enable the upsert mode.
             if (upsertMode) {
@@ -400,7 +417,7 @@ public class FlinkSink {
             }
 
             IcebergStreamWriter<RowData> streamWriter = createStreamWriter(
-                    table, flinkRowType, equalityFieldIds, upsertMode, inlongMetric, auditHostAndPorts);
+                    table, flinkRowType, equalityFieldIds, upsertMode, appendMode, inlongMetric, auditHostAndPorts);
 
             int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
             SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -474,6 +491,7 @@ public class FlinkSink {
             RowType flinkRowType,
             List<Integer> equalityFieldIds,
             boolean upsert,
+            boolean appendMode,
             String inlongMetric,
             String auditHostAndPorts) {
         Preconditions.checkArgument(table != null, "Iceberg table should't be null");
@@ -484,7 +502,7 @@ public class FlinkSink {
         Table serializableTable = SerializableTable.copyOf(table);
         TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
                 serializableTable, flinkRowType, targetFileSize,
-                fileFormat, equalityFieldIds, upsert);
+                fileFormat, equalityFieldIds, upsert, appendMode);
 
         return new IcebergStreamWriter<>(table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts);
     }
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
index 8e5315cad..7942ef77c 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
@@ -59,6 +59,9 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.SortedMap;
 
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
 class IcebergFilesCommitter extends AbstractStreamOperator<Void>
         implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 75eca46c5..79df9049f 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -36,6 +36,9 @@ import java.io.IOException;
 
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
 class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
         implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
index f13ff4237..97f4f3f02 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
@@ -28,6 +28,9 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
 class ManifestOutputFileFactory {
     // Users could define their own flink manifests directory by setting this value in table properties.
     static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location";
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
index 1c2cf61b1..cd0442b9a 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
@@ -31,6 +31,8 @@ import org.apache.iceberg.flink.sink.FlinkSink;
 /**
  * Create a {@link KeySelector} to shuffle by partition key, then each partition/bucket will be wrote by only one
  * task. That will reduce lots of small files in partitioned fanout write policy for {@link FlinkSink}.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
 class PartitionKeySelector implements KeySelector<RowData, String> {
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionedDeltaWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionedDeltaWriter.java
new file mode 100644
index 000000000..230e40f30
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionedDeltaWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
+
+  private final PartitionKey partitionKey;
+
+  private final Map<PartitionKey, RowDataDeltaWriter> writers = Maps.newHashMap();
+
+  PartitionedDeltaWriter(PartitionSpec spec,
+                         FileFormat format,
+                         FileAppenderFactory<RowData> appenderFactory,
+                         OutputFileFactory fileFactory,
+                         FileIO io,
+                         long targetFileSize,
+                         Schema schema,
+                         RowType flinkSchema,
+                         List<Integer> equalityFieldIds,
+                         boolean upsert) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+        upsert);
+    this.partitionKey = new PartitionKey(spec, schema);
+  }
+
+  @Override
+  RowDataDeltaWriter route(RowData row) {
+    partitionKey.partition(wrapper().wrap(row));
+
+    RowDataDeltaWriter writer = writers.get(partitionKey);
+    if (writer == null) {
+      // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+      PartitionKey copiedKey = partitionKey.copy();
+      writer = new RowDataDeltaWriter(copiedKey);
+      writers.put(copiedKey, writer);
+    }
+
+    return writer;
+  }
+
+  @Override
+  public void close() {
+    try {
+      Tasks.foreach(writers.values())
+          .throwFailureWhenFinished()
+          .noRetry()
+          .run(RowDataDeltaWriter::close, IOException.class);
+
+      writers.clear();
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to close equality delta writer", e);
+    }
+  }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
new file mode 100644
index 000000000..831ef6a4a
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitionedFanoutWriter;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ArrayUtil;
+
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add an option `sink.ignore.changelog` to support insert-only mode without equalityFieldIds.
+ */
+public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
+    private final Table table;
+    private final Schema schema;
+    private final RowType flinkSchema;
+    private final PartitionSpec spec;
+    private final FileIO io;
+    private final long targetFileSizeBytes;
+    private final FileFormat format;
+    private final List<Integer> equalityFieldIds;
+    private final boolean upsert;
+    private final boolean appendMode;
+    private final FileAppenderFactory<RowData> appenderFactory;
+
+    private transient OutputFileFactory outputFileFactory;
+
+    public RowDataTaskWriterFactory(Table table,
+            RowType flinkSchema,
+            long targetFileSizeBytes,
+            FileFormat format,
+            List<Integer> equalityFieldIds,
+            boolean upsert,
+            boolean appendMode) {
+        this.table = table;
+        this.schema = table.schema();
+        this.flinkSchema = flinkSchema;
+        this.spec = table.spec();
+        this.io = table.io();
+        this.targetFileSizeBytes = targetFileSizeBytes;
+        this.format = format;
+        this.equalityFieldIds = equalityFieldIds;
+        this.upsert = upsert;
+        this.appendMode = appendMode;
+
+        if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
+            this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
+        } else {
+            // TODO provide the ability to customize the equality-delete row schema.
+            this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
+                    ArrayUtil.toIntArray(equalityFieldIds), schema, null);
+        }
+    }
+
+    @Override
+    public void initialize(int taskId, int attemptId) {
+        this.outputFileFactory = OutputFileFactory.builderFor(table, taskId, attemptId).build();
+    }
+
+    @Override
+    public TaskWriter<RowData> create() {
+        Preconditions.checkNotNull(outputFileFactory,
+                "The outputFileFactory shouldn't be null if we have invoked the initialize().");
+
+        if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
+            // Initialize a task writer to write INSERT only.
+            if (spec.isUnpartitioned()) {
+                return new UnpartitionedWriter<>(
+                        spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes);
+            } else {
+                return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
+                        io, targetFileSizeBytes, schema, flinkSchema);
+            }
+        } else {
+            // Initialize a task writer to write both INSERT and equality DELETE.
+            if (spec.isUnpartitioned()) {
+                return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
+                        targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+            } else {
+                return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
+                        targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+            }
+        }
+    }
+
+    private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter<RowData> {
+
+        private final PartitionKey partitionKey;
+        private final RowDataWrapper rowDataWrapper;
+
+        RowDataPartitionedFanoutWriter(
+                PartitionSpec spec, FileFormat format, FileAppenderFactory<RowData> appenderFactory,
+                OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema,
+                RowType flinkSchema) {
+            super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+            this.partitionKey = new PartitionKey(spec, schema);
+            this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+        }
+
+        @Override
+        protected PartitionKey partition(RowData row) {
+            partitionKey.partition(rowDataWrapper.wrap(row));
+            return partitionKey;
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/UnpartitionedDeltaWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/UnpartitionedDeltaWriter.java
new file mode 100644
index 000000000..c88724880
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/UnpartitionedDeltaWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
+  private final RowDataDeltaWriter writer;
+
+  UnpartitionedDeltaWriter(PartitionSpec spec,
+                           FileFormat format,
+                           FileAppenderFactory<RowData> appenderFactory,
+                           OutputFileFactory fileFactory,
+                           FileIO io,
+                           long targetFileSize,
+                           Schema schema,
+                           RowType flinkSchema,
+                           List<Integer> equalityFieldIds,
+                           boolean upsert) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+        upsert);
+    this.writer = new RowDataDeltaWriter(null);
+  }
+
+  @Override
+  RowDataDeltaWriter route(RowData row) {
+    return writer;
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+}
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index aabd3e7ce..4dc99082b 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -96,13 +96,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-connector-hive</artifactId>
+            <artifactId>sort-connector-hbase</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-connector-hbase</artifactId>
+            <artifactId>sort-connector-hive</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
@@ -112,6 +112,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-iceberg-dlc</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>sort-connector-elasticsearch6</artifactId>
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java
index 5857fc70c..739c507ce 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java
@@ -60,8 +60,8 @@ public class DLCIcebergSqlParseTest {
                 null,
                 map,
                 "id",
-                Collections.singletonList("test"),
-                "localhost",
+                Collections.singletonList("mysql_input"),
+                "127.0.0.1",
                 "root",
                 "123456",
                 "inlong",
@@ -74,13 +74,13 @@ public class DLCIcebergSqlParseTest {
     private DLCIcebergLoadNode buildDLCLoadNode() {
         // set HIVE_CONF_DIR,or set uri and warehouse
         Map<String, String> properties = new HashMap<>();
+
         properties.put(DLCConstant.DLC_REGION, "ap-beijing");
         properties.put(DLCConstant.DLC_SECRET_ID, "XXXXXXXXXXX");
         properties.put(DLCConstant.DLC_SECRET_KEY, "XXXXXXXXXXX");
+        properties.put(DLCConstant.DLC_USER_APPID, "XXXXXXXXXXX");
+        properties.put(DLCConstant.DLC_MANAGED_ACCOUNT_UID, "XXXXXXXXXXX");
 
-        properties.put(DLCConstant.FS_COS_REGION, "ap-beijing");
-        properties.put(DLCConstant.FS_COS_SECRET_ID, "XXXXXXXXXXX");
-        properties.put(DLCConstant.FS_COS_SECRET_KEY, "XXXXXXXXXXX");
         List<FieldRelation> relations = Arrays
                 .asList(new FieldRelation(new FieldInfo("id", new IntFormatInfo()),
                                 new FieldInfo("id", new IntFormatInfo())),
diff --git a/licenses/inlong-manager/LICENSE b/licenses/inlong-manager/LICENSE
index 41449fc11..73687e2ac 100644
--- a/licenses/inlong-manager/LICENSE
+++ b/licenses/inlong-manager/LICENSE
@@ -623,11 +623,11 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
   org.apache.hadoop:hadoop-yarn-api:2.10.1 - Apache Hadoop YARN API (https://github.com/apache/hadoop/tree/branch-2.10.1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api), (Apache License, Version 2.0)
   org.apache.hadoop:hadoop-yarn-client:2.10.1 - Apache Hadoop YARN Client (https://github.com/apache/hadoop/tree/branch-2.10.1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client), (Apache License, Version 2.0)
   com.carrotsearch:hppc:0.7.2 - HPPC Collections (https://github.com/carrotsearch/hppc/tree/0.7.2), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-api:0.13.1 - Apace Iceberg API (https://https://iceberg.apache.org/), (Apache License, Version 2.0)
-  org.apache.iceberg:iceberg-bundled-guava:0.13.1 - Apache Iceberg Bundled-guava (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-common:0.13.1 - Apache Iceberg Common (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-core:0.13.1 - Apace Iceberg Core (https://https://iceberg.apache.org/), (Apache License, Version 2.0)
-  org.apache.iceberg:iceberg-hive-metastore:0.13.1 - Apace Iceberg Hive Metastore (https://https://iceberg.apache.org/), (Apache License, Version 2.0)
+  org.apache.iceberg:iceberg-api:0.13.2 - Apace Iceberg API (https://https://iceberg.apache.org/), (Apache License, Version 2.0)
+  org.apache.iceberg:iceberg-bundled-guava:0.13.2 - Apache Iceberg Bundled-guava (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-common:0.13.2 - Apache Iceberg Common (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-core:0.13.2 - Apace Iceberg Core (https://https://iceberg.apache.org/), (Apache License, Version 2.0)
+  org.apache.iceberg:iceberg-hive-metastore:0.13.2 - Apace Iceberg Hive Metastore (https://https://iceberg.apache.org/), (Apache License, Version 2.0)
   org.codehaus.jackson:jackson-core-asl:1.9.13 - Jackson (https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-core-asl/1.9.13), (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.2 - Jackson-dataformat-YAML (https://github.com/FasterXML/jackson-dataformat-yaml/tree/jackson-dataformat-yaml-2.13.2), (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.2 - Jackson datatype: JSR310 (https://github.com/FasterXML/jackson-modules-java8/tree/jackson-modules-java8-2.13.2), (The Apache Software License, Version 2.0)
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 7c823b0af..980c471b7 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -503,7 +503,35 @@
        inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
        inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
        inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
-  Source  : iceberg-flink-runtime-1.13 1.13.5 (Please note that the software have been modified.)
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/BaseDeltaTaskWriter.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionedDeltaWriter.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
+       inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/UnpartitionedDeltaWriter.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalog.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalogFactory.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/BaseDeltaTaskWriter.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifests.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifestsSerializer.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkManifestUtil.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergFilesCommitter.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/ManifestOutputFileFactory.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionedDeltaWriter.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionKeySelector.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
+       inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/UnpartitionedDeltaWriter.java
+  Source  : iceberg-flink:iceberg-flink-1.13:0.13.2 (Please note that the software have been modified.)
   License : https://github.com/apache/iceberg/LICENSE
 
  1.3.7 inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/AbstractJdbcDialect.java
@@ -542,6 +570,7 @@
   Source  : flink-table-runtime-blink_2.11-13.2-rc2 2.2.1 (Please note that the software have been modified.)
   License : https://github.com/apache/flink/blob/release-1.13.2-rc2/LICENSE
 
+
 =======================================================================
 Apache InLong Subcomponents:
 
@@ -771,7 +800,7 @@ The text of each license is the standard Apache 2.0 license.
   org.ini4j:ini4j:0.5.1 - ini4j (https://sourceforge.net/projects/ini4j), (The Apache Software License, Version 2.0)
   com.squareup.okhttp:logging-interceptor:2.7.5 - logging-interceptor (https://github.com/square/okhttp/tree/master/okhttp-logging-interceptor), (The Apache Software License, Version 2.0)
   com.tencentcloudapi:tencentcloud-sdk-java:3.1.545 - tencentcloud-sdk-java (https://github.com/TencentCloud/tencentcloud-sdk-java), (The Apache Software License, Version 2.0)
-  com.qcloud:dlc-data-catalog-metastore-client:1.1 - dlc-data-catalog-metastore-client (https://mvnrepository.com/artifact/com.qcloud/dlc-data-catalog-metastore-client/1.1), (The Apache Software License, Version 2.0)
+  com.qcloud:dlc-data-catalog-metastore-client:1.1.1 - dlc-data-catalog-metastore-client (https://mvnrepository.com/artifact/com.qcloud/dlc-data-catalog-metastore-client/1.1), (The Apache Software License, Version 2.0)
   org.apache.doris:flink-doris-connector-1.13_2.11:1.0.3 - Flink Connector for Apache Doris (https://github.com/apache/doris-flink-connector/tree/1.13_2.11-1.0.3), (The Apache Software License, Version 2.0)
 
 
@@ -804,16 +833,16 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
   com.carrotsearch:hppc:0.7.1 - HPPC Collections (https://github.com/carrotsearch/hppc/tree/0.7.1), (The Apache Software License, Version 2.0)
   com.carrotsearch:hppc:0.7.2 - HPPC Collections (https://github.com/carrotsearch/hppc/tree/0.7.2), (The Apache Software License, Version 2.0)
   com.carrotsearch:hppc:0.8.1 - HPPC Collections (https://github.com/carrotsearch/hppc/tree/0.8.1), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-api:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-bundled-guava:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-common:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-core:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-data:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-flink-runtime-1.13:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-hive-metastore:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-orc:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-parquet:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
-  org.apache.iceberg:iceberg-flink-runtime-1.13:jar:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-api:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-bundled-guava:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-common:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-core:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-data:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-flink-runtime-1.13:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-hive-metastore:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-orc:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-parquet:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+  org.apache.iceberg:iceberg-flink-runtime-1.13:jar:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
   org.codehaus.jackson:jackson-core-asl:1.9.13 - Jackson (https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-core-asl/1.9.13), (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.2 - Jackson-dataformat-YAML (https://github.com/FasterXML/jackson-dataformat-yaml/tree/jackson-dataformat-yaml-2.13.2), (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.2 - Jackson datatype: JSR310 (https://github.com/FasterXML/jackson-modules-java8/tree/jackson-modules-java8-2.13.2), (The Apache Software License, Version 2.0)
diff --git a/pom.xml b/pom.xml
index 35c0423f0..a96c42a46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -201,8 +201,7 @@
         <pulsar.version>2.8.1</pulsar.version>
         <pulsar.testcontainers.version>1.15.3</pulsar.testcontainers.version>
         <kafka.version>2.4.1</kafka.version>
-        <iceberg.flink.version>0.13.1</iceberg.flink.version>
-        <iceberg.version>0.13.1</iceberg.version>
+        <iceberg.version>0.13.2</iceberg.version>
         <flink.version>1.13.5</flink.version>
         <flink.minor.version>1.13</flink.minor.version>
         <flink.connector.mysql.cdc.version>2.2.1</flink.connector.mysql.cdc.version>
@@ -249,7 +248,9 @@
         <tencentcloud.api.version>3.1.545</tencentcloud.api.version>
         <cos.hadoop.version>2.7.5-5.9.3</cos.hadoop.version>
         <cos.bundle.version>5.6.35</cos.bundle.version>
-        <dlc.client.version>1.1</dlc.client.version>
+        <dlc.client.version>1.1.1</dlc.client.version>
+        <dlc.jdbc.version>2.2.6</dlc.jdbc.version>
+        <cos.lakefs.plugin.version>1.0</cos.lakefs.plugin.version>
         <esri-geometry-api.version>2.0.0</esri-geometry-api.version>
         <HikariCP.version>4.0.3</HikariCP.version>
         <caffeine.version>2.9.3</caffeine.version>
@@ -1033,7 +1034,7 @@
             <dependency>
                 <groupId>org.apache.iceberg</groupId>
                 <artifactId>iceberg-flink-runtime-1.13</artifactId>
-                <version>${iceberg.flink.version}</version>
+                <version>${iceberg.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
@@ -1570,6 +1571,16 @@
                 <artifactId>dlc-data-catalog-metastore-client</artifactId>
                 <version>${dlc.client.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.qcloud.cos</groupId>
+                <artifactId>lakefs-cloud-plugin</artifactId>
+                <version>${cos.lakefs.plugin.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.qcloud</groupId>
+                <artifactId>dlc-jdbc</artifactId>
+                <version>${dlc.jdbc.version}</version>
+            </dependency>
             <dependency>
                 <groupId>com.github.ben-manes.caffeine</groupId>
                 <artifactId>caffeine</artifactId>