You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/14 12:07:11 UTC

[incubator-inlong] branch master updated: [INLONG-4431][Sort] Sort lightweight support load data to DLC (#4655)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 272774e54 [INLONG-4431][Sort] Sort lightweight support load data to DLC (#4655)
272774e54 is described below

commit 272774e549b2c7a6366157cb907748b1ffae62db
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Tue Jun 14 20:07:04 2022 +0800

    [INLONG-4431][Sort] Sort lightweight support load data to DLC (#4655)
---
 inlong-sort/pom.xml                                |   1 +
 .../inlong/sort/protocol/constant/DLCConstant.java |  71 +++
 .../apache/inlong/sort/protocol/node/LoadNode.java |   4 +-
 .../org/apache/inlong/sort/protocol/node/Node.java |   4 +-
 .../protocol/node/load/DLCIcebergLoadNode.java     | 134 +++++
 .../protocol/node/load/DLCIcebergLoadNodeTest.java |  61 +++
 inlong-sort/sort-connectors/iceberg-dlc/pom.xml    | 138 +++++
 .../iceberg/catalog/hybris/CachedClientPool.java   |  86 ++++
 .../catalog/hybris/DLCWrappedHybrisClientPool.java | 106 ++++
 .../catalog/hybris/DlcWrappedHybrisCatalog.java    | 573 +++++++++++++++++++++
 .../catalog/hybris/HiveTableOperations.java        | 560 ++++++++++++++++++++
 inlong-sort/sort-connectors/pom.xml                |   2 +
 .../inlong/sort/parser/DLCIcebergSqlParseTest.java | 147 ++++++
 licenses/inlong-sort-connectors/LICENSE            |   1 +
 .../licenses/LICENSE-cos_api-bundle.txt            | 202 ++++++++
 .../notices/NOTICE-cos_api-bundle.txt              |   8 +
 pom.xml                                            |  29 ++
 17 files changed, 2125 insertions(+), 2 deletions(-)

diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 8cc3a51f8..48503587d 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -46,6 +46,7 @@
         <kafka.clients.version>2.7.0</kafka.clients.version>
         <rat.basedir>${basedir}</rat.basedir>
         <hbase.version>2.2.3</hbase.version>
+        <dlc.hive.version>2.3.7</dlc.hive.version>
     </properties>
     <dependencyManagement>
         <dependencies>
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java
new file mode 100644
index 000000000..1788cd5ba
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java
@@ -0,0 +1,71 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.constant;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DLCConstant {
+    /**
+     * DLC internet access domain name.
+     */
+    public static final String DLC_ENDPOINT = "dlc.tencentcloudapi.com";
+
+    /**
+     * dlc account region
+     */
+    public static final String DLC_REGION  = "qcloud.dlc.region";
+    /**
+     * dlc account secret id
+     */
+    public static final String DLC_SECRET_ID  = "qcloud.dlc.secret-id";
+    /**
+     * dlc account secret key
+     */
+    public static final String DLC_SECRET_KEY  = "qcloud.dlc.secret-key";
+
+    /**
+     * dlc cos region
+     */
+    public static final String FS_COS_REGION  = "fs.cosn.userinfo.region";
+    /**
+     * dlc main account cos secret id
+     */
+    public static final String FS_COS_SECRET_ID  = "fs.cosn.userinfo.secretId";
+    /**
+     * dlc main account cos secret key
+     */
+    public static final String FS_COS_SECRET_KEY  = "fs.cosn.userinfo.secretKey";
+
+    public static final String FS_LAKEFS_IMPL  = "fs.lakefs.impl";
+    public static final String FS_COS_IMPL  = "fs.cosn.impl";
+    public static final String FS_COS_AUTH_PROVIDER  = "fs.cosn.credentials.provider";
+
+    public static final String DLC_CATALOG_IMPL_CLASS =
+            "org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog";
+    public static final Map<String, String> DLC_DEFAULT_IMPL =
+            Collections.unmodifiableMap(new HashMap<String, String>() {
+                {
+                    put(FS_LAKEFS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
+                    put(FS_COS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
+                    put(FS_COS_AUTH_PROVIDER, "org.apache.hadoop.fs.auth.SimpleCredentialProvider");
+                }
+            });
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index bf796207a..393ffbc5c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -30,6 +30,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
+import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
 import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
@@ -68,7 +69,8 @@ import java.util.Map;
         @JsonSubTypes.Type(value = IcebergLoadNode.class, name = "icebergLoad"),
         @JsonSubTypes.Type(value = ElasticsearchLoadNode.class, name = "elasticsearchLoad"),
         @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
-        @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad")
+        @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad"),
+        @JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name = "dlcIcebergLoad")
 })
 @NoArgsConstructor
 @Data
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index c105e3c4f..d6d892b1f 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -32,6 +32,7 @@ import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
 import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
+import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
 import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
@@ -81,7 +82,8 @@ import java.util.TreeMap;
         @JsonSubTypes.Type(value = IcebergLoadNode.class, name = "icebergLoad"),
         @JsonSubTypes.Type(value = ElasticsearchLoadNode.class, name = "elasticsearchLoad"),
         @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
-        @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad")
+        @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad"),
+        @JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name = "dlcIcebergLoad")
 })
 public interface Node {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
new file mode 100644
index 000000000..41891ad8f
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
@@ -0,0 +1,134 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.DLCConstant;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@JsonTypeName("dlcIcebergLoad")
+@Data
+@NoArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+public class DLCIcebergLoadNode extends LoadNode implements Serializable {
+
+    private static final long serialVersionUID = -1L;
+
+    @JsonProperty("tableName")
+    @Nonnull
+    private String tableName;
+
+    @JsonProperty("dbName")
+    @Nonnull
+    private String dbName;
+
+    @JsonProperty("primaryKey")
+    private String primaryKey;
+
+    @JsonProperty("uri")
+    private String uri;
+
+    @JsonProperty("warehouse")
+    private String warehouse;
+
+    @JsonCreator
+    public DLCIcebergLoadNode(@JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelationShips,
+            @JsonProperty("filters") List<FilterFunction> filters,
+            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+            @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+            @JsonProperty("properties") Map<String, String> properties,
+            @Nonnull @JsonProperty("dbName") String dbName,
+            @Nonnull @JsonProperty("tableName") String tableName,
+            @JsonProperty("primaryKey") String primaryKey,
+            @JsonProperty("uri") String uri,
+            @JsonProperty("warehouse") String warehouse) {
+        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
+        this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
+        this.primaryKey = primaryKey;
+        this.uri = uri == null ? DLCConstant.DLC_ENDPOINT : uri;
+        this.warehouse = warehouse;
+        validateAuth(properties);
+    }
+
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> options = super.tableOptions();
+        options.put("connector", "iceberg");
+        options.put("catalog-database", dbName);
+        options.put("catalog-table", tableName);
+        options.put("default-database", dbName);
+        options.put("catalog-name", CatalogType.HYBRIS.name());
+        options.put("catalog-impl", DLCConstant.DLC_CATALOG_IMPL_CLASS);
+        if (null != uri) {
+            options.put("uri", uri);
+        }
+        if (null != warehouse) {
+            options.put("warehouse", warehouse);
+        }
+        options.putAll(DLCConstant.DLC_DEFAULT_IMPL);
+        return options;
+    }
+
+    @Override
+    public String genTableName() {
+        return tableName;
+    }
+
+    @Override
+    public String getPrimaryKey() {
+        return primaryKey;
+    }
+
+    @Override
+    public List<FieldInfo> getPartitionFields() {
+        return super.getPartitionFields();
+    }
+
+    private void validateAuth(Map<String, String> properties) {
+        Preconditions.checkNotNull(properties);
+        Preconditions.checkNotNull(properties.get(DLCConstant.DLC_SECRET_ID), "dlc secret-id is null");
+        Preconditions.checkNotNull(properties.get(DLCConstant.DLC_SECRET_KEY), "dlc secret-key is null");
+        Preconditions.checkNotNull(properties.get(DLCConstant.DLC_REGION), "dlc region is null");
+
+        Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_REGION), "cos region is null");
+        Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_SECRET_ID), "cos secret-id is null");
+        Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_SECRET_KEY), "cos secret-key is null");
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java
new file mode 100644
index 000000000..0f690b75a
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java
@@ -0,0 +1,61 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.DLCConstant;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * test for dlc load node
+ */
+public class DLCIcebergLoadNodeTest extends SerializeBaseTest<DLCIcebergLoadNode> {
+    @Override
+    public DLCIcebergLoadNode getTestObject() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(DLCConstant.DLC_REGION, "ap-beijing");
+        properties.put(DLCConstant.DLC_SECRET_ID, "XXXXXXXXXXX");
+        properties.put(DLCConstant.DLC_SECRET_KEY, "XXXXXXXXXXX");
+
+        properties.put(DLCConstant.FS_COS_REGION, "ap-beijing");
+        properties.put(DLCConstant.FS_COS_SECRET_ID, "XXXXXXXXXXX");
+        properties.put(DLCConstant.FS_COS_SECRET_KEY, "XXXXXXXXXXX");
+        return new DLCIcebergLoadNode(
+                "iceberg_dlc",
+                "iceberg_dlc_output",
+                Arrays.asList(new FieldInfo("field", new StringFormatInfo())),
+                Arrays.asList(new FieldRelation(new FieldInfo("field", new StringFormatInfo()),
+                        new FieldInfo("field", new StringFormatInfo()))),
+                null,
+                null,
+                null,
+                properties,
+                "inlong",
+                "inlong_iceberg_dlc",
+                null,
+                null,
+                "/iceberg_dlc/warehouse");
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/pom.xml b/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
new file mode 100644
index 000000000..c09d61bf6
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>sort-connectors</artifactId>
+        <groupId>org.apache.inlong</groupId>
+        <version>1.3.0-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>sort-connector-iceberg-dlc</artifactId>
+    <name>Apache InLong - Sort-connector-iceberg-dlc</name>
+    <packaging>jar</packaging>
+    <description>
+        DLC is a data lake product.It's storage is based on iceberg.
+        Here is the product: https://cloud.tencent.com/product/dlc
+    </description>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.qcloud.cos</groupId>
+            <artifactId>hadoop-cos</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.qcloud</groupId>
+            <artifactId>cos_api-bundle</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.tencentcloudapi</groupId>
+            <artifactId>tencentcloud-sdk-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.qcloud</groupId>
+            <artifactId>dlc-data-catalog-metastore-client</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-metastore</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.tencentcloudapi</groupId>
+                    <artifactId>tencentcloud-sdk-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-flink-runtime-1.13</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>${dlc.hive.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.pentaho</groupId>
+                    <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.iceberg:*</include>
+                                    <include>org.apache.hive:hive-exec</include>
+
+                                    <!--tencent cloud-->
+                                    <include>com.qcloud.cos:*</include>
+                                    <include>com.qcloud:*</include>
+                                    <include>com.tencentcloudapi:*</include>
+                                    <include>commons-logging:commons-logging</include>
+                                    <include>com.squareup.okio:okio</include>
+                                    <include>com.squareup.okhttp:okhttp</include>
+                                    <include>com.squareup.okhttp:logging-interceptor</include>
+                                    <include>org.ini4j:ini4j</include>
+                                    <include>com.google.code.gson:gson</include>
+                                </includes>
+                            </artifactSet>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.parquet</pattern>
+                                    <shadedPattern>org.apache.hive.shaded.parquet</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/CachedClientPool.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/CachedClientPool.java
new file mode 100644
index 000000000..7167d1dd0
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/CachedClientPool.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.catalog.hybris;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.Cache;
+import org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CachedClientPool implements ClientPool<IMetaStoreClient, TException> {
+    private static final Logger LOG  = LoggerFactory.getLogger(CachedClientPool.class);
+    private static Cache<String, DLCWrappedHybrisClientPool> clientPoolCache;
+
+    private final Configuration conf;
+    private final String metastoreUri;
+    private final int clientPoolSize;
+    private final long evictionInterval;
+
+    CachedClientPool(Configuration conf, Map<String, String> properties) {
+        this.conf = conf;
+        this.metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+        this.clientPoolSize = PropertyUtil.propertyAsInt(properties,
+                CatalogProperties.CLIENT_POOL_SIZE,
+                CatalogProperties.CLIENT_POOL_SIZE_DEFAULT);
+        this.evictionInterval = PropertyUtil.propertyAsLong(properties,
+                CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
+                CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT);
+        init();
+    }
+
+    @VisibleForTesting
+    DLCWrappedHybrisClientPool clientPool() {
+        return clientPoolCache.get(metastoreUri, k -> new DLCWrappedHybrisClientPool(clientPoolSize, conf));
+    }
+
+    private synchronized void init() {
+        if (clientPoolCache == null) {
+            clientPoolCache = Caffeine.newBuilder().expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
+                    .removalListener((key, value, cause) -> ((DLCWrappedHybrisClientPool) value).close())
+                    .build();
+        }
+    }
+
+    @VisibleForTesting
+    static Cache<String, DLCWrappedHybrisClientPool> clientPoolCache() {
+        return clientPoolCache;
+    }
+
+    @Override
+    public <R> R run(Action<R, IMetaStoreClient, TException> action) throws TException, InterruptedException {
+        return clientPool().run(action);
+    }
+
+    @Override
+    public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry)
+            throws TException, InterruptedException {
+        return clientPool().run(action, retry);
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DLCWrappedHybrisClientPool.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DLCWrappedHybrisClientPool.java
new file mode 100644
index 000000000..dcc20a14f
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DLCWrappedHybrisClientPool.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.catalog.hybris;
+
+import com.qcloud.dlc.metastore.DLCDataCatalogMetastoreClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.iceberg.ClientPoolImpl;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.hive.RuntimeMetaException;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * DLC Catalog client pool.
+ */
+public class DLCWrappedHybrisClientPool extends ClientPoolImpl<IMetaStoreClient, TException> {
+
+    // use appropriate ctor depending on whether we're working with Hive2 or Hive3 dependencies
+    // we need to do this because there is a breaking API change between Hive2 and Hive3
+    private static final DynConstructors.Ctor<DLCDataCatalogMetastoreClient> CLIENT_CTOR = DynConstructors.builder()
+            .impl(DLCDataCatalogMetastoreClient.class, HiveConf.class)
+            .impl(DLCDataCatalogMetastoreClient.class, Configuration.class).build();
+
+    private final HiveConf hiveConf;
+
+    public DLCWrappedHybrisClientPool(int poolSize, Configuration conf) {
+        // Do not allow retry by default as we rely on RetryingHiveClient
+        super(poolSize, TTransportException.class, false);
+        this.hiveConf = new HiveConf(conf, DLCWrappedHybrisClientPool.class);
+        this.hiveConf.addResource(conf);
+    }
+
+    @Override
+    protected IMetaStoreClient newClient()  {
+        try {
+            try {
+                return CLIENT_CTOR.newInstance(hiveConf);
+            } catch (RuntimeException e) {
+                // any MetaException would be wrapped into RuntimeException during reflection,
+                // so let's double-check type here
+                if (e.getCause() instanceof MetaException) {
+                    throw (MetaException) e.getCause();
+                }
+                throw e;
+            }
+        } catch (MetaException e) {
+            throw new RuntimeMetaException(e, "Failed to connect to Hive Metastore");
+        } catch (Throwable t) {
+            if (t.getMessage().contains("Another instance of Derby may have already booted")) {
+                throw new RuntimeMetaException(t, "Failed to start an embedded metastore because embedded "
+                        + "Derby supports only one client at a time. To fix this, use a metastore that supports "
+                        + "multiple clients.");
+            }
+
+            throw new RuntimeMetaException(t, "Failed to connect to Hive Metastore");
+        }
+    }
+
+    @Override
+    protected IMetaStoreClient reconnect(IMetaStoreClient client) {
+        try {
+            client.close();
+            client.reconnect();
+        } catch (MetaException e) {
+            throw new RuntimeMetaException(e, "Failed to reconnect to Hive Metastore");
+        }
+        return client;
+    }
+
+    @Override
+    protected boolean isConnectionException(Exception e) {
+        return super.isConnectionException(e) || (e != null && e instanceof MetaException
+                && e.getMessage().contains("Got exception: org.apache.thrift.transport.TTransportException"));
+    }
+
+    @Override
+    protected void close(IMetaStoreClient client) {
+        client.close();
+    }
+
+    @VisibleForTesting
+    HiveConf hiveConf() {
+        return hiveConf;
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java
new file mode 100644
index 000000000..a8f64db52
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.catalog.hybris;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.qcloud.dlc.metastore.DLCDataCatalogMetastoreClient;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The only changed point is HiveClientPool, from
+ * {@link HiveMetaStoreClient} to {@link DLCDataCatalogMetastoreClient}
+ */
+public class DlcWrappedHybrisCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
+    public static final String LIST_ALL_TABLES = "list-all-tables";
+    public static final String LIST_ALL_TABLES_DEFAULT = "false";
+
+    // dlc auth
+    public static final String DLC_ENDPOINT = "qcloud.dlc.endpoint";
+
+
+    private static final Logger LOG = LoggerFactory.getLogger(DlcWrappedHybrisCatalog.class);
+
+    private String name;
+    private Configuration conf;
+    private FileIO fileIO;
+    private ClientPool<IMetaStoreClient, TException> clients;
+    private boolean listAllTables = false;
+
+    public DlcWrappedHybrisCatalog() {
+    }
+
+    @Override
+    public void initialize(String inputName, Map<String, String> properties) {
+        this.name = inputName;
+        if (conf == null) {
+            LOG.warn("No Hadoop Configuration was set, using the default environment Configuration");
+            this.conf = new Configuration();
+        }
+
+        if (properties.containsKey(CatalogProperties.URI)) {
+            this.conf.set(DLC_ENDPOINT, properties.get(CatalogProperties.URI));
+        }
+
+        if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
+            this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
+                    properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+        }
+
+        // dlc auth
+        properties.entrySet().stream()
+                .filter(entry -> entry.getKey().startsWith("qcloud.dlc"))
+                .forEach(entry -> this.conf.set(entry.getKey(), entry.getValue()));
+
+        // lakefs auth
+        properties.entrySet().stream()
+                .filter(entry -> entry.getKey().startsWith("fs"))
+                .forEach(entry -> this.conf.set(entry.getKey(), entry.getValue()));
+
+        this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));
+
+        String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+        this.fileIO = fileIOImpl == null
+                ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
+
+        this.clients = new CachedClientPool(conf, properties);
+    }
+
+    @Override
+    public List<TableIdentifier> listTables(Namespace namespace) {
+        Preconditions.checkArgument(isValidateNamespace(namespace),
+                "Missing database in namespace: %s", namespace);
+        String database = namespace.level(0);
+
+        try {
+            List<String> tableNames = clients.run(client -> client.getAllTables(database));
+            List<TableIdentifier> tableIdentifiers;
+
+            if (listAllTables) {
+                tableIdentifiers = tableNames.stream()
+                        .map(t -> TableIdentifier.of(namespace, t))
+                        .collect(Collectors.toList());
+            } else {
+                List<Table> tableObjects = clients.run(client -> client.getTableObjectsByName(database, tableNames));
+                tableIdentifiers = tableObjects.stream()
+                        .filter(table -> table.getParameters() != null
+                                && BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
+                                    .equalsIgnoreCase(
+                                            table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
+                        .map(table -> TableIdentifier.of(namespace, table.getTableName()))
+                        .collect(Collectors.toList());
+            }
+
+            LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, tableIdentifiers);
+            return tableIdentifiers;
+
+        } catch (UnknownDBException e) {
+            throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
+
+        } catch (TException e) {
+            throw new RuntimeException("Failed to list all tables under namespace " + namespace, e);
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Interrupted in call to listTables", e);
+        }
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public boolean dropTable(TableIdentifier identifier, boolean purge) {
+        if (!isValidIdentifier(identifier)) {
+            return false;
+        }
+
+        String database = identifier.namespace().level(0);
+
+        TableOperations ops = newTableOps(identifier);
+        TableMetadata lastMetadata;
+        if (purge && ops.current() != null) {
+            lastMetadata = ops.current();
+        } else {
+            lastMetadata = null;
+        }
+
+        try {
+            clients.run(client -> {
+                client.dropTable(database, identifier.name(),
+                        false /* do not delete data */,
+                        false /* throw NoSuchObjectException if the table doesn't exist */);
+                return null;
+            });
+
+            if (purge && lastMetadata != null) {
+                CatalogUtil.dropTableData(ops.io(), lastMetadata);
+            }
+
+            LOG.info("Dropped table: {}", identifier);
+            return true;
+
+        } catch (NoSuchTableException | NoSuchObjectException e) {
+            LOG.info("Skipping drop, table does not exist: {}", identifier, e);
+            return false;
+
+        } catch (TException e) {
+            throw new RuntimeException("Failed to drop " + identifier, e);
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Interrupted in call to dropTable", e);
+        }
+    }
+
+    @Override
+    public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
+        if (!isValidIdentifier(from)) {
+            throw new NoSuchTableException("Invalid identifier: %s", from);
+        }
+
+        TableIdentifier to = removeCatalogName(originalTo);
+        Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to);
+
+        String toDatabase = to.namespace().level(0);
+        String fromDatabase = from.namespace().level(0);
+        String fromName = from.name();
+
+        try {
+            Table table = clients.run(client -> client.getTable(fromDatabase, fromName));
+            HiveTableOperations.validateTableIsIceberg(table, fullTableName(name, from));
+
+            table.setDbName(toDatabase);
+            table.setTableName(to.name());
+
+            clients.run(client -> {
+                client.alter_table(fromDatabase, fromName, table);
+                return null;
+            });
+
+            LOG.info("Renamed table from {}, to {}", from, to);
+
+        } catch (NoSuchObjectException e) {
+            throw new NoSuchTableException("Table does not exist: %s", from);
+
+        } catch (AlreadyExistsException e) {
+            throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", to);
+
+        } catch (TException e) {
+            throw new RuntimeException("Failed to rename " + from + " to " + to, e);
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Interrupted in call to rename", e);
+        }
+    }
+
+    @Override
+    public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
+        Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);
+
+        // Throw an exception if this table already exists in the catalog.
+        if (tableExists(identifier)) {
+            throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
+        }
+
+        TableOperations ops = newTableOps(identifier);
+        InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
+        TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
+        ops.commit(null, metadata);
+
+        return new BaseTable(ops, identifier.toString());
+    }
+
+    @Override
+    public void createNamespace(Namespace namespace, Map<String, String> meta) {
+        Preconditions.checkArgument(
+                !namespace.isEmpty(),
+                "Cannot create namespace with invalid name: %s", namespace);
+        Preconditions.checkArgument(isValidateNamespace(namespace),
+                "Cannot support multi part namespace in Hive Metastore: %s", namespace);
+
+        try {
+            clients.run(client -> {
+                client.createDatabase(convertToDatabase(namespace, meta));
+                return null;
+            });
+
+            LOG.info("Created namespace: {}", namespace);
+
+        } catch (AlreadyExistsException e) {
+            throw new org.apache.iceberg.exceptions.AlreadyExistsException(e, "Namespace '%s' already exists!",
+                    namespace);
+
+        } catch (TException e) {
+            throw new RuntimeException("Failed to create namespace " + namespace + " in Hive Metastore", e);
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(
+                    "Interrupted in call to createDatabase(name) " + namespace + " in Hive Metastore", e);
+        }
+    }
+
+    @Override
+    public List<Namespace> listNamespaces(Namespace namespace) {
+        if (!isValidateNamespace(namespace) && !namespace.isEmpty()) {
+            throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
+        }
+        if (!namespace.isEmpty()) {
+            return ImmutableList.of();
+        }
+        try {
+            List<Namespace> namespaces = clients.run(IMetaStoreClient::getAllDatabases)
+                    .stream()
+                    .map(Namespace::of)
+                    .collect(Collectors.toList());
+
+            LOG.debug("Listing namespace {} returned tables: {}", namespace, namespaces);
+            return namespaces;
+
+        } catch (TException e) {
+            throw new RuntimeException("Failed to list all namespace: " + namespace + " in Hive Metastore",  e);
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(
+                    "Interrupted in call to getAllDatabases() " + namespace + " in Hive Metastore", e);
+        }
+    }
+
+    @Override
+    public boolean dropNamespace(Namespace namespace) {
+        if (!isValidateNamespace(namespace)) {
+            return false;
+        }
+
+        try {
+            clients.run(client -> {
+                client.dropDatabase(namespace.level(0),
+                        false /* deleteData */,
+                        false /* ignoreUnknownDb */,
+                        false /* cascade */);
+                return null;
+            });
+
+            LOG.info("Dropped namespace: {}", namespace);
+            return true;
+
+        } catch (InvalidOperationException e) {
+            throw new NamespaceNotEmptyException(e, "Namespace %s is not empty. One or more tables exist.", namespace);
+
+        } catch (NoSuchObjectException e) {
+            return false;
+
+        } catch (TException e) {
+            throw new RuntimeException("Failed to drop namespace " + namespace + " in Hive Metastore", e);
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(
+                    "Interrupted in call to drop dropDatabase(name) " + namespace + " in Hive Metastore", e);
+        }
+    }
+
+    @Override
+    public boolean setProperties(Namespace namespace,  Map<String, String> properties) {
+        Map<String, String> parameter = Maps.newHashMap();
+
+        parameter.putAll(loadNamespaceMetadata(namespace));
+        parameter.putAll(properties);
+        Database database = convertToDatabase(namespace, parameter);
+
+        alterHiveDataBase(namespace, database);
+        LOG.debug("Successfully set properties {} for {}", properties.keySet(), namespace);
+
+        // Always successful, otherwise exception is thrown
+        return true;
+    }
+
+    @Override
+    public boolean removeProperties(Namespace namespace,  Set<String> properties) {
+        Map<String, String> parameter = Maps.newHashMap();
+
+        parameter.putAll(loadNamespaceMetadata(namespace));
+        properties.forEach(key -> parameter.put(key, null));
+        Database database = convertToDatabase(namespace, parameter);
+
+        alterHiveDataBase(namespace, database);
+        LOG.debug("Successfully removed properties {} from {}", properties, namespace);
+
+        // Always successful, otherwise exception is thrown
+        return true;
+    }
+
+    private void alterHiveDataBase(Namespace namespace,  Database database) {
+        try {
+            clients.run(client -> {
+                client.alterDatabase(namespace.level(0), database);
+                return null;
+            });
+
+        } catch (NoSuchObjectException | UnknownDBException e) {
+            throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace);
+
+        } catch (TException e) {
+            throw new RuntimeException(
+                    "Failed to list namespace under namespace: " + namespace + " in Hive Metastore", e);
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(
+                    "Interrupted in call to getDatabase(name) " + namespace + " in Hive Metastore", e);
+        }
+    }
+
+    @Override
+    public Map<String, String> loadNamespaceMetadata(Namespace namespace) {
+        if (!isValidateNamespace(namespace)) {
+            throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
+        }
+
+        try {
+            Database database = clients.run(client -> client.getDatabase(namespace.level(0)));
+            Map<String, String> metadata = convertToMetadata(database);
+            LOG.debug("Loaded metadata for namespace {} found {}", namespace, metadata.keySet());
+            return metadata;
+
+        } catch (NoSuchObjectException | UnknownDBException e) {
+            throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace);
+
+        } catch (TException e) {
+            throw new RuntimeException(
+                    "Failed to list namespace under namespace: " + namespace + " in Hive Metastore", e);
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(
+                    "Interrupted in call to getDatabase(name) " + namespace + " in Hive Metastore", e);
+        }
+    }
+
+    @Override
+    protected boolean isValidIdentifier(TableIdentifier tableIdentifier) {
+        return tableIdentifier.namespace().levels().length == 1;
+    }
+
+    private TableIdentifier removeCatalogName(TableIdentifier to) {
+        if (isValidIdentifier(to)) {
+            return to;
+        }
+
+        // check if the identifier includes the catalog name and remove it
+        if (to.namespace().levels().length == 2 && name().equalsIgnoreCase(to.namespace().level(0))) {
+            return TableIdentifier.of(Namespace.of(to.namespace().level(1)), to.name());
+        }
+
+        // return the original unmodified
+        return to;
+    }
+
+    private boolean isValidateNamespace(Namespace namespace) {
+        return namespace.levels().length == 1;
+    }
+
+    @Override
+    public TableOperations newTableOps(TableIdentifier tableIdentifier) {
+        String dbName = tableIdentifier.namespace().level(0);
+        String tableName = tableIdentifier.name();
+        return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName);
+    }
+
+    @Override
+    protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+        // This is a little edgy since we basically duplicate the HMS location generation logic.
+        // Sadly I do not see a good way around this if we want to keep the order of events, like:
+        // - Create meta files
+        // - Create the metadata in HMS, and this way committing the changes
+
+        // Create a new location based on the namespace / database if it is set on database level
+        try {
+            Database databaseData = clients.run(client -> client.getDatabase(tableIdentifier.namespace().levels()[0]));
+            if (databaseData.getLocationUri() != null) {
+                // If the database location is set use it as a base.
+                return String.format("%s/%s", databaseData.getLocationUri(), tableIdentifier.name());
+            }
+
+        } catch (TException e) {
+            throw new RuntimeException(String.format("Metastore operation failed for %s", tableIdentifier), e);
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Interrupted during commit", e);
+        }
+
+        // Otherwise stick to the {WAREHOUSE_DIR}/{DB_NAME}.db/{TABLE_NAME} path
+        String warehouseLocation = getWarehouseLocation();
+        return String.format(
+                "%s/%s.db/%s",
+                warehouseLocation,
+                tableIdentifier.namespace().levels()[0],
+                tableIdentifier.name());
+    }
+
+    private String getWarehouseLocation() {
+        String warehouseLocation = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
+        Preconditions.checkNotNull(warehouseLocation,
+                "Warehouse location is not set: hive.metastore.warehouse.dir=null");
+        return warehouseLocation;
+    }
+
+    private Map<String, String> convertToMetadata(Database database) {
+
+        Map<String, String> meta = Maps.newHashMap();
+
+        meta.putAll(database.getParameters());
+        meta.put("location", database.getLocationUri());
+        if (database.getDescription() != null) {
+            meta.put("comment", database.getDescription());
+        }
+
+        return meta;
+    }
+
+    Database convertToDatabase(Namespace namespace, Map<String, String> meta) {
+        if (!isValidateNamespace(namespace)) {
+            throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
+        }
+
+        Database database = new Database();
+        Map<String, String> parameter = Maps.newHashMap();
+
+        database.setName(namespace.level(0));
+        database.setLocationUri(new Path(getWarehouseLocation(), namespace.level(0)).toString() + ".db");
+
+        meta.forEach((key, value) -> {
+            if (key.equals("comment")) {
+                database.setDescription(value);
+            } else if (key.equals("location")) {
+                database.setLocationUri(value);
+            } else {
+                if (value != null) {
+                    parameter.put(key, value);
+                }
+            }
+        });
+        database.setParameters(parameter);
+
+        return database;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("name", name)
+                .add("uri", this.conf == null ? "" : this.conf.get(HiveConf.ConfVars.METASTOREURIS.varname))
+                .toString();
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = new Configuration(conf);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @VisibleForTesting
+    void setListAllTables(boolean listAllTables) {
+        this.listAllTables = listAllTables;
+    }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/HiveTableOperations.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/HiveTableOperations.java
new file mode 100644
index 000000000..62cd7a939
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/HiveTableOperations.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.catalog.hybris;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.ConfigProperties;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.BiMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.Cache;
+import org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.GC_ENABLED;
+
+/**
+ * TODO we should be able to extract some more commonalities to BaseMetastoreTableOperations to
+ * avoid code duplication between this class and Metacat Tables.
+ */
+public class HiveTableOperations extends BaseMetastoreTableOperations {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class);
+
+    private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+    private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+    private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+    private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = "iceberg.hive.metadata-refresh-max-retries";
+    private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = "iceberg.hive.table-level-lock-evict-ms";
+    private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+    private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+    private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+    private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2;
+    private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+    private static final DynMethods.UnboundMethod ALTER_TABLE = DynMethods.builder("alter_table")
+            .impl(IMetaStoreClient.class, "alter_table_with_environmentContext",
+                    String.class, String.class, Table.class, EnvironmentContext.class)
+            .impl(IMetaStoreClient.class, "alter_table",
+                    String.class, String.class, Table.class, EnvironmentContext.class)
+            .impl(IMetaStoreClient.class, "alter_table",
+                    String.class, String.class, Table.class)
+            .build();
+    private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of(
+            // gc.enabled in Iceberg and external.table.purge in Hive
+            //      are meant to do the same things but with different names
+            GC_ENABLED, "external.table.purge"
+    );
+
+    private static Cache<String, ReentrantLock> commitLockCache;
+
+    private static synchronized void initTableLevelLockCache(long evictionTimeout) {
+        if (commitLockCache == null) {
+            commitLockCache = Caffeine.newBuilder()
+                    .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+                    .build();
+        }
+    }
+
+    /**
+     * Provides key translation where necessary between Iceberg and HMS props. This translation is needed because some
+     * properties control the same behaviour but are named differently in Iceberg and Hive. Therefore changes to these
+     * property pairs should be synchronized.
+     *
+     * Example: Deleting data files upon DROP TABLE is enabled using gc.enabled=true in Iceberg and
+     * external.table.purge=true in Hive. Hive and Iceberg users are unaware of each other's control flags, therefore
+     * inconsistent behaviour can occur from e.g. a Hive user's point of view if external.table.purge=true is set on the
+     * HMS table but gc.enabled=false is set on the Iceberg table, resulting in no data file deletion.
+     *
+     * @param hmsProp The HMS property that should be translated to Iceberg property
+     * @return Iceberg property equivalent to the hmsProp.
+     *         If no such translation exists, the original hmsProp is returned
+     */
+    public static String translateToIcebergProp(String hmsProp) {
+        return ICEBERG_TO_HMS_TRANSLATION.inverse().getOrDefault(hmsProp, hmsProp);
+    }
+
+    private static class WaitingForLockException extends RuntimeException {
+        WaitingForLockException(String message) {
+            super(message);
+        }
+    }
+
+    private final String fullName;
+    private final String database;
+    private final String tableName;
+    private final Configuration conf;
+    private final long lockAcquireTimeout;
+    private final long lockCheckMinWaitTime;
+    private final long lockCheckMaxWaitTime;
+    private final int metadataRefreshMaxRetries;
+    private final FileIO fileIO;
+    private final ClientPool<IMetaStoreClient, TException> metaClients;
+
+    protected HiveTableOperations(Configuration conf, ClientPool metaClients, FileIO fileIO,
+            String catalogName, String database, String table) {
+        this.conf = conf;
+        this.metaClients = metaClients;
+        this.fileIO = fileIO;
+        this.fullName = catalogName + "." + database + "." + table;
+        this.database = database;
+        this.tableName = table;
+        this.lockAcquireTimeout =
+                conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+        this.lockCheckMinWaitTime =
+                conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+        this.lockCheckMaxWaitTime =
+                conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+        this.metadataRefreshMaxRetries =
+                conf.getInt(HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES,
+                        HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT);
+        long tableLevelLockCacheEvictionTimeout =
+                conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+        initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+    }
+
+    @Override
+    protected String tableName() {
+        return fullName;
+    }
+
+    @Override
+    public FileIO io() {
+        return fileIO;
+    }
+
+    @Override
+    protected void doRefresh() {
+        String metadataLocation = null;
+        try {
+            Table table = metaClients.run(client -> client.getTable(database, tableName));
+            validateTableIsIceberg(table, fullName);
+
+            metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
+
+        } catch (NoSuchObjectException e) {
+            if (currentMetadataLocation() != null) {
+                throw new NoSuchTableException("No such table: %s.%s", database, tableName);
+            }
+
+        } catch (TException e) {
+            String errMsg = String.format("Failed to get table info from metastore %s.%s", database, tableName);
+            throw new RuntimeException(errMsg, e);
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Interrupted during refresh", e);
+        }
+
+        refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries);
+    }
+
+    @SuppressWarnings("checkstyle:CyclomaticComplexity")
+    @Override
+    protected void doCommit(TableMetadata base, TableMetadata metadata) {
+        String newMetadataLocation = base == null && metadata.metadataFileLocation() != null
+                ? metadata.metadataFileLocation() : writeNewMetadata(metadata, currentVersion() + 1);
+        boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
+        boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false);
+
+        CommitStatus commitStatus = CommitStatus.FAILURE;
+        boolean updateHiveTable = false;
+        Optional<Long> lockId = Optional.empty();
+        // getting a process-level lock per table to avoid concurrent commit attempts to the same table from the same
+        // JVM process, which would result in unnecessary and costly HMS lock acquisition requests
+        ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true));
+        tableLevelMutex.lock();
+        try {
+            lockId = Optional.of(acquireLock());
+            // TODO add lock heart beating for cases where default lock timeout is too low.
+
+            Table tbl = loadHmsTable();
+
+            if (tbl != null) {
+                // If we try to create the table but the metadata location is already set,
+                // then we had a concurrent commit
+                if (base == null
+                        && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) != null) {
+                    throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);
+                }
+
+                updateHiveTable = true;
+                LOG.debug("Committing existing table: {}", fullName);
+            } else {
+                tbl = newHmsTable();
+                LOG.debug("Committing new table: {}", fullName);
+            }
+
+            tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes
+
+            String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
+            String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
+            if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
+                throw new CommitFailedException(
+                        "Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s",
+                        baseMetadataLocation, metadataLocation, database, tableName);
+            }
+
+            // get Iceberg props that have been removed
+            Set<String> removedProps = Collections.emptySet();
+            if (base != null) {
+                removedProps = base.properties().keySet().stream()
+                        .filter(key -> !metadata.properties().containsKey(key))
+                        .collect(Collectors.toSet());
+            }
+
+            Map<String, String> summary = Optional.ofNullable(metadata.currentSnapshot())
+                    .map(Snapshot::summary)
+                    .orElseGet(ImmutableMap::of);
+            setHmsTableParameters(newMetadataLocation, tbl, metadata, removedProps, hiveEngineEnabled, summary);
+
+            if (!keepHiveStats) {
+                tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
+            }
+
+            try {
+                persistTable(tbl, updateHiveTable);
+                commitStatus = CommitStatus.SUCCESS;
+            } catch (Throwable persistFailure) {
+                LOG.error("Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
+                        database, tableName, persistFailure);
+                commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+                switch (commitStatus) {
+                    case SUCCESS:
+                        break;
+                    case FAILURE:
+                        throw persistFailure;
+                    case UNKNOWN:
+                        throw new CommitStateUnknownException(persistFailure);
+                }
+            }
+        } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
+            throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);
+
+        } catch (TException | UnknownHostException e) {
+            if (e.getMessage() != null && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
+                throw new RuntimeException("Failed to acquire locks from metastore because 'HIVE_LOCKS' doesn't "
+                        + "exist, this probably happened when using embedded metastore or doesn't create a "
+                        + "transactional meta table. To fix this, use an alternative metastore", e);
+            }
+
+            throw new RuntimeException(String.format("Metastore operation failed for %s.%s", database, tableName), e);
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Interrupted during commit", e);
+
+        } finally {
+            cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex);
+        }
+    }
+
+    @VisibleForTesting
+    void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException {
+        if (updateHiveTable) {
+            metaClients.run(client -> {
+                EnvironmentContext envContext = new EnvironmentContext(
+                        ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE)
+                );
+                ALTER_TABLE.invoke(client, database, tableName, hmsTable, envContext);
+                return null;
+            });
+        } else {
+            metaClients.run(client -> {
+                client.createTable(hmsTable);
+                return null;
+            });
+        }
+    }
+
+    private Table loadHmsTable() throws TException, InterruptedException {
+        try {
+            return metaClients.run(client -> client.getTable(database, tableName));
+        } catch (NoSuchObjectException nte) {
+            LOG.trace("Table not found {}", fullName, nte);
+            return null;
+        }
+    }
+
+    private Table newHmsTable() {
+        final long currentTimeMillis = System.currentTimeMillis();
+
+        Table newTable = new Table(tableName,
+                database,
+                System.getProperty("user.name"),
+                (int) currentTimeMillis / 1000,
+                (int) currentTimeMillis / 1000,
+                Integer.MAX_VALUE,
+                null,
+                Collections.emptyList(),
+                Maps.newHashMap(),
+                null,
+                null,
+                TableType.EXTERNAL_TABLE.toString());
+
+        newTable.getParameters().put("EXTERNAL", "TRUE"); // using the external table type also requires this
+        return newTable;
+    }
+
+    private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableMetadata metadata,
+            Set<String> obsoleteProps, boolean hiveEngineEnabled,
+            Map<String, String> summary) {
+        Map<String, String> parameters = Optional.ofNullable(tbl.getParameters())
+                .orElseGet(Maps::newHashMap);
+
+        // push all Iceberg table properties into HMS
+        metadata.properties().forEach((key, value) -> {
+            // translate key names between Iceberg and HMS where needed
+            String hmsKey = ICEBERG_TO_HMS_TRANSLATION.getOrDefault(key, key);
+            parameters.put(hmsKey, value);
+        });
+        if (metadata.uuid() != null) {
+            parameters.put(TableProperties.UUID, metadata.uuid());
+        }
+
+        // remove any props from HMS that are no longer present in Iceberg table props
+        obsoleteProps.forEach(parameters::remove);
+
+        parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH));
+        parameters.put(METADATA_LOCATION_PROP, newMetadataLocation);
+
+        if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) {
+            parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation());
+        }
+
+        // If needed set the 'storage_handler' property to enable query from Hive
+        if (hiveEngineEnabled) {
+            parameters.put(hive_metastoreConstants.META_TABLE_STORAGE,
+                    "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler");
+        } else {
+            parameters.remove(hive_metastoreConstants.META_TABLE_STORAGE);
+        }
+
+        // Set the basic statistics
+        if (summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP) != null) {
+            parameters.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+        }
+        if (summary.get(SnapshotSummary.TOTAL_RECORDS_PROP) != null) {
+            parameters.put(StatsSetupConst.ROW_COUNT, summary.get(SnapshotSummary.TOTAL_RECORDS_PROP));
+        }
+        if (summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP) != null) {
+            parameters.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
+        }
+
+        tbl.setParameters(parameters);
+    }
+
+    private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) {
+
+        final StorageDescriptor storageDescriptor = new StorageDescriptor();
+        storageDescriptor.setCols(HiveSchemaUtil.convert(metadata.schema()));
+        storageDescriptor.setLocation(metadata.location());
+        SerDeInfo serDeInfo = new SerDeInfo();
+        serDeInfo.setParameters(Maps.newHashMap());
+        if (hiveEngineEnabled) {
+            storageDescriptor.setInputFormat("org.apache.iceberg.mr.hive.HiveIcebergInputFormat");
+            storageDescriptor.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat");
+            serDeInfo.setSerializationLib("org.apache.iceberg.mr.hive.HiveIcebergSerDe");
+        } else {
+            storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileOutputFormat");
+            storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
+            serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
+        }
+        storageDescriptor.setSerdeInfo(serDeInfo);
+        return storageDescriptor;
+    }
+
+    @VisibleForTesting
+    long acquireLock() throws UnknownHostException, TException, InterruptedException {
+        final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
+        lockComponent.setTablename(tableName);
+        final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent),
+                System.getProperty("user.name"),
+                InetAddress.getLocalHost().getHostName());
+        LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));
+        AtomicReference<LockState> state = new AtomicReference<>(lockResponse.getState());
+        long lockId = lockResponse.getLockid();
+
+        final long start = System.currentTimeMillis();
+        long duration = 0;
+        boolean timeout = false;
+
+        try {
+            if (state.get().equals(LockState.WAITING)) {
+                // Retry count is the typical "upper bound of retries" for Tasks.run() function.
+                // In fact, the maximum number of
+                // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use timeout as the
+                // upper bound of retries. So it is just reasonable to set a large retry count. However, if we set
+                // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow into Integer.MIN_VALUE. Hence,
+                // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it doesn't hit any boundary issues.
+                Tasks.foreach(lockId)
+                        .retry(Integer.MAX_VALUE - 100)
+                        .exponentialBackoff(
+                                lockCheckMinWaitTime,
+                                lockCheckMaxWaitTime,
+                                lockAcquireTimeout,
+                                1.5)
+                        .throwFailureWhenFinished()
+                        .onlyRetryOn(WaitingForLockException.class)
+                        .run(id -> {
+                            try {
+                                LockResponse response = metaClients.run(client -> client.checkLock(id));
+                                LockState newState = response.getState();
+                                state.set(newState);
+                                if (newState.equals(LockState.WAITING)) {
+                                    throw new WaitingForLockException("Waiting for lock.");
+                                }
+                            } catch (InterruptedException e) {
+                                Thread.interrupted(); // Clear the interrupt status flag
+                                LOG.warn("Interrupted while waiting for lock.", e);
+                            }
+                        }, TException.class);
+            }
+        } catch (WaitingForLockException waitingForLockException) {
+            timeout = true;
+            duration = System.currentTimeMillis() - start;
+        } finally {
+            if (!state.get().equals(LockState.ACQUIRED)) {
+                unlock(Optional.of(lockId));
+            }
+        }
+
+        // timeout and do not have lock acquired
+        if (timeout && !state.get().equals(LockState.ACQUIRED)) {
+            throw new CommitFailedException("Timed out after %s ms waiting for lock on %s.%s",
+                    duration, database, tableName);
+        }
+
+        if (!state.get().equals(LockState.ACQUIRED)) {
+            throw new CommitFailedException("Could not acquire the lock on %s.%s, "
+                    + "lock request ended in state %s", database, tableName, state);
+        }
+        return lockId;
+    }
+
+    private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, Optional<Long> lockId,
+            ReentrantLock tableLevelMutex) {
+        try {
+            if (commitStatus == CommitStatus.FAILURE) {
+                // If we are sure the commit failed, clean up the uncommitted metadata file
+                io().deleteFile(metadataLocation);
+            }
+        } catch (RuntimeException e) {
+            LOG.error("Fail to cleanup metadata file at {}", metadataLocation, e);
+            throw e;
+        } finally {
+            unlock(lockId);
+            tableLevelMutex.unlock();
+        }
+    }
+
+    private void unlock(Optional<Long> lockId) {
+        if (lockId.isPresent()) {
+            try {
+                doUnlock(lockId.get());
+            } catch (Exception e) {
+                LOG.warn("Failed to unlock {}.{}", database, tableName, e);
+            }
+        }
+    }
+
+    @VisibleForTesting
+    void doUnlock(long lockId) throws TException, InterruptedException {
+        metaClients.run(client -> {
+            client.unlock(lockId);
+            return null;
+        });
+    }
+
+    static void validateTableIsIceberg(Table table, String fullName) {
+        String tableType = table.getParameters().get(TABLE_TYPE_PROP);
+        NoSuchIcebergTableException.check(tableType != null && tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE),
+                "Not an iceberg table: %s (type=%s)", fullName, tableType);
+    }
+
+    /**
+     * Returns if the hive engine related values should be enabled on the table, or not.
+     * <p>
+     * The decision is made like this:
+     * <ol>
+     * <li>Table property value {@link TableProperties#ENGINE_HIVE_ENABLED}
+     * <li>If the table property is not set then check the hive-site.xml property value
+     * {@link ConfigProperties#ENGINE_HIVE_ENABLED}
+     * <li>If none of the above is enabled then use the default value
+     * {@link TableProperties#ENGINE_HIVE_ENABLED_DEFAULT}
+     * </ol>
+     * @param metadata Table metadata to use
+     * @param conf The hive configuration to use
+     * @return if the hive engine related values should be enabled or not
+     */
+    private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration conf) {
+        if (metadata.properties().get(TableProperties.ENGINE_HIVE_ENABLED) != null) {
+            // We know that the property is set, so default value will not be used,
+            return metadata.propertyAsBoolean(TableProperties.ENGINE_HIVE_ENABLED, false);
+        }
+
+        return conf.getBoolean(ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT);
+    }
+}
diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml
index f0417cee3..8431d6867 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -40,7 +40,9 @@
         <module>kafka</module>
         <module>jdbc</module>
         <module>pulsar</module>
+
         <module>iceberg</module>
+        <module>iceberg-dlc</module>
         <module>hbase</module>
         <module>postgres-cdc</module>
         <module>mongodb-cdc</module>
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java
new file mode 100644
index 000000000..5857fc70c
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java
@@ -0,0 +1,147 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.constant.DLCConstant;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * test DLC search sql parse
+ */
+public class DLCIcebergSqlParseTest {
+    private MySqlExtractNode buildMySQLExtractNode(String id) {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("id", new IntFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()));
+        // if you hope hive load mode of append, please add this config
+        Map<String, String> map = new HashMap<>();
+        return new MySqlExtractNode(id,
+                "mysql_input",
+                fields,
+                null,
+                map,
+                "id",
+                Collections.singletonList("test"),
+                "localhost",
+                "root",
+                "123456",
+                "inlong",
+                3306,
+                null,
+                null,
+                null);
+    }
+
+    private DLCIcebergLoadNode buildDLCLoadNode() {
+        // set HIVE_CONF_DIR,or set uri and warehouse
+        Map<String, String> properties = new HashMap<>();
+        properties.put(DLCConstant.DLC_REGION, "ap-beijing");
+        properties.put(DLCConstant.DLC_SECRET_ID, "XXXXXXXXXXX");
+        properties.put(DLCConstant.DLC_SECRET_KEY, "XXXXXXXXXXX");
+
+        properties.put(DLCConstant.FS_COS_REGION, "ap-beijing");
+        properties.put(DLCConstant.FS_COS_SECRET_ID, "XXXXXXXXXXX");
+        properties.put(DLCConstant.FS_COS_SECRET_KEY, "XXXXXXXXXXX");
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new IntFormatInfo()),
+                                new FieldInfo("id", new IntFormatInfo())),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())));
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("id", new IntFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()));
+        return new DLCIcebergLoadNode(
+                "iceberg",
+                "iceberg_output",
+                fields,
+                relations,
+                null,
+                null,
+                null,
+                properties,
+                "inlong",
+                "dlc_output",
+                "id",
+                null,
+                "/hive/warehouse");
+    }
+
+    /**
+     * build node relation
+     *
+     * @param inputs  extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    /**
+     * Test mysql to DLC
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testDLCIceberg() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildMySQLExtractNode("1");
+        Node outputNode = buildDLCLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(inputNode, outputNode),
+                Arrays.asList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("group_id", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        FlinkSqlParseResult result = (FlinkSqlParseResult) parser.parse();
+        Assert.assertTrue(!result.getLoadSqls().isEmpty() && !result.getCreateTableSqls().isEmpty());
+    }
+}
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 1fda35242..ee1c8d287 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -756,6 +756,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
   org.codehaus.jackson:jackson-mapper-asl:1.9.13 - Data Mapper for Jackson (https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-mapper-asl/1.9.13), (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.7.8 - Jackson module: JAXB Annotations (https://github.com/FasterXML/jackson-modules-base), (The Apache Software License, Version 2.0)
   com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.10.5 - Jackson module: JAXB Annotations (https://github.com/FasterXML/jackson-modules-base), (The Apache Software License, Version 2.0)
+  com.qcloud:cos_api-bundle:5.6.35 - COS SDK for Java - Bundle (The Apache Software License, Version 2.0)
   org.codehaus.jackson:jackson-xc:1.9.2 - Xml Compatibility extensions for Jackson (https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-xc/1.9.2), (The Apache Software License, Version 2.0;  GNU Lesser General Public License (LGPL), Version 2.1)
   org.javassist:javassist:3.24.0-GA - Javassist (https://github.com/jboss-javassist/javassist/tree/rel_3_24_0_ga), (Apache License 2.0;  MPL 1.1;  LGPL 2.1)
   org.mortbay.jetty:jetty:6.1.26 - Jetty Server (http://www.eclipse.org/jetty/jetty-parent/project/modules/jetty), (Apache Software License - Version 2.0;  EPL 1.0)
diff --git a/licenses/inlong-sort-connectors/licenses/LICENSE-cos_api-bundle.txt b/licenses/inlong-sort-connectors/licenses/LICENSE-cos_api-bundle.txt
new file mode 100644
index 000000000..75b52484e
--- /dev/null
+++ b/licenses/inlong-sort-connectors/licenses/LICENSE-cos_api-bundle.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/licenses/inlong-sort-connectors/notices/NOTICE-cos_api-bundle.txt b/licenses/inlong-sort-connectors/notices/NOTICE-cos_api-bundle.txt
new file mode 100644
index 000000000..7b2ab9c06
--- /dev/null
+++ b/licenses/inlong-sort-connectors/notices/NOTICE-cos_api-bundle.txt
@@ -0,0 +1,8 @@
+
+Apache HttpClient
+Copyright 1999-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
diff --git a/pom.xml b/pom.xml
index 684f7d884..f4f87f42d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -238,6 +238,10 @@
         <jcommander.version>1.78</jcommander.version>
         <je.version>7.3.7</je.version>
         <tencentcloud.cls.version>1.0.5</tencentcloud.cls.version>
+        <tencentcloud.api.version>3.1.439</tencentcloud.api.version>
+        <cos.hadoop.version>2.7.5-5.9.3</cos.hadoop.version>
+        <cos.bundle.version>5.6.35</cos.bundle.version>
+        <dlc.client.version>1.0</dlc.client.version>
         <esri-geometry-api.version>2.0.0</esri-geometry-api.version>
         <HikariCP.version>4.0.3</HikariCP.version>
 
@@ -1011,6 +1015,11 @@
                 <artifactId>iceberg-flink-runtime-1.13</artifactId>
                 <version>${iceberg.flink.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.iceberg</groupId>
+                <artifactId>iceberg-flink-runtime-1.13</artifactId>
+                <version>${iceberg.flink.version}</version>
+            </dependency>
 
             <dependency>
                 <groupId>org.apache.flink</groupId>
@@ -1498,6 +1507,26 @@
                     </exclusion>
                 </exclusions>
             </dependency>
+            <dependency>
+                <groupId>com.tencentcloudapi</groupId>
+                <artifactId>tencentcloud-sdk-java</artifactId>
+                <version>${tencentcloud.api.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.qcloud.cos</groupId>
+                <artifactId>hadoop-cos</artifactId>
+                <version>${cos.hadoop.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.qcloud</groupId>
+                <artifactId>cos_api-bundle</artifactId>
+                <version>${cos.bundle.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.qcloud</groupId>
+                <artifactId>dlc-data-catalog-metastore-client</artifactId>
+                <version>${dlc.client.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>