You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/14 12:07:11 UTC
[incubator-inlong] branch master updated: [INLONG-4431][Sort] Sort lightweight support load data to DLC (#4655)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 272774e54 [INLONG-4431][Sort] Sort lightweight support load data to DLC (#4655)
272774e54 is described below
commit 272774e549b2c7a6366157cb907748b1ffae62db
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Tue Jun 14 20:07:04 2022 +0800
[INLONG-4431][Sort] Sort lightweight support load data to DLC (#4655)
---
inlong-sort/pom.xml | 1 +
.../inlong/sort/protocol/constant/DLCConstant.java | 71 +++
.../apache/inlong/sort/protocol/node/LoadNode.java | 4 +-
.../org/apache/inlong/sort/protocol/node/Node.java | 4 +-
.../protocol/node/load/DLCIcebergLoadNode.java | 134 +++++
.../protocol/node/load/DLCIcebergLoadNodeTest.java | 61 +++
inlong-sort/sort-connectors/iceberg-dlc/pom.xml | 138 +++++
.../iceberg/catalog/hybris/CachedClientPool.java | 86 ++++
.../catalog/hybris/DLCWrappedHybrisClientPool.java | 106 ++++
.../catalog/hybris/DlcWrappedHybrisCatalog.java | 573 +++++++++++++++++++++
.../catalog/hybris/HiveTableOperations.java | 560 ++++++++++++++++++++
inlong-sort/sort-connectors/pom.xml | 2 +
.../inlong/sort/parser/DLCIcebergSqlParseTest.java | 147 ++++++
licenses/inlong-sort-connectors/LICENSE | 1 +
.../licenses/LICENSE-cos_api-bundle.txt | 202 ++++++++
.../notices/NOTICE-cos_api-bundle.txt | 8 +
pom.xml | 29 ++
17 files changed, 2125 insertions(+), 2 deletions(-)
diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 8cc3a51f8..48503587d 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -46,6 +46,7 @@
<kafka.clients.version>2.7.0</kafka.clients.version>
<rat.basedir>${basedir}</rat.basedir>
<hbase.version>2.2.3</hbase.version>
+ <dlc.hive.version>2.3.7</dlc.hive.version>
</properties>
<dependencyManagement>
<dependencies>
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java
new file mode 100644
index 000000000..1788cd5ba
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.constant;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DLCConstant {
+ /**
+ * DLC internet access domain name.
+ */
+ public static final String DLC_ENDPOINT = "dlc.tencentcloudapi.com";
+
+ /**
+ * dlc account region
+ */
+ public static final String DLC_REGION = "qcloud.dlc.region";
+ /**
+ * dlc account secret id
+ */
+ public static final String DLC_SECRET_ID = "qcloud.dlc.secret-id";
+ /**
+ * dlc account secret key
+ */
+ public static final String DLC_SECRET_KEY = "qcloud.dlc.secret-key";
+
+ /**
+ * dlc cos region
+ */
+ public static final String FS_COS_REGION = "fs.cosn.userinfo.region";
+ /**
+ * dlc main account cos secret id
+ */
+ public static final String FS_COS_SECRET_ID = "fs.cosn.userinfo.secretId";
+ /**
+ * dlc main account cos secret key
+ */
+ public static final String FS_COS_SECRET_KEY = "fs.cosn.userinfo.secretKey";
+
+ public static final String FS_LAKEFS_IMPL = "fs.lakefs.impl";
+ public static final String FS_COS_IMPL = "fs.cosn.impl";
+ public static final String FS_COS_AUTH_PROVIDER = "fs.cosn.credentials.provider";
+
+ public static final String DLC_CATALOG_IMPL_CLASS =
+ "org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog";
+ public static final Map<String, String> DLC_DEFAULT_IMPL =
+ Collections.unmodifiableMap(new HashMap<String, String>() {
+ {
+ put(FS_LAKEFS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
+ put(FS_COS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
+ put(FS_COS_AUTH_PROVIDER, "org.apache.hadoop.fs.auth.SimpleCredentialProvider");
+ }
+ });
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index bf796207a..393ffbc5c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -30,6 +30,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
+import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
@@ -68,7 +69,8 @@ import java.util.Map;
@JsonSubTypes.Type(value = IcebergLoadNode.class, name = "icebergLoad"),
@JsonSubTypes.Type(value = ElasticsearchLoadNode.class, name = "elasticsearchLoad"),
@JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
- @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad")
+ @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad"),
+ @JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name = "dlcIcebergLoad")
})
@NoArgsConstructor
@Data
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index c105e3c4f..d6d892b1f 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -32,6 +32,7 @@ import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
+import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
@@ -81,7 +82,8 @@ import java.util.TreeMap;
@JsonSubTypes.Type(value = IcebergLoadNode.class, name = "icebergLoad"),
@JsonSubTypes.Type(value = ElasticsearchLoadNode.class, name = "elasticsearchLoad"),
@JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
- @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad")
+ @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad"),
+ @JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name = "dlcIcebergLoad")
})
public interface Node {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
new file mode 100644
index 000000000..41891ad8f
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.DLCConstant;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@JsonTypeName("dlcIcebergLoad")
+@Data
+@NoArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+public class DLCIcebergLoadNode extends LoadNode implements Serializable {
+
+ private static final long serialVersionUID = -1L;
+
+ @JsonProperty("tableName")
+ @Nonnull
+ private String tableName;
+
+ @JsonProperty("dbName")
+ @Nonnull
+ private String dbName;
+
+ @JsonProperty("primaryKey")
+ private String primaryKey;
+
+ @JsonProperty("uri")
+ private String uri;
+
+ @JsonProperty("warehouse")
+ private String warehouse;
+
+ @JsonCreator
+ public DLCIcebergLoadNode(@JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelationShips,
+ @JsonProperty("filters") List<FilterFunction> filters,
+ @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+ @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+ @JsonProperty("properties") Map<String, String> properties,
+ @Nonnull @JsonProperty("dbName") String dbName,
+ @Nonnull @JsonProperty("tableName") String tableName,
+ @JsonProperty("primaryKey") String primaryKey,
+ @JsonProperty("uri") String uri,
+ @JsonProperty("warehouse") String warehouse) {
+ super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
+ this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
+ this.primaryKey = primaryKey;
+ this.uri = uri == null ? DLCConstant.DLC_ENDPOINT : uri;
+ this.warehouse = warehouse;
+ validateAuth(properties);
+ }
+
+ @Override
+ public Map<String, String> tableOptions() {
+ Map<String, String> options = super.tableOptions();
+ options.put("connector", "iceberg");
+ options.put("catalog-database", dbName);
+ options.put("catalog-table", tableName);
+ options.put("default-database", dbName);
+ options.put("catalog-name", CatalogType.HYBRIS.name());
+ options.put("catalog-impl", DLCConstant.DLC_CATALOG_IMPL_CLASS);
+ if (null != uri) {
+ options.put("uri", uri);
+ }
+ if (null != warehouse) {
+ options.put("warehouse", warehouse);
+ }
+ options.putAll(DLCConstant.DLC_DEFAULT_IMPL);
+ return options;
+ }
+
+ @Override
+ public String genTableName() {
+ return tableName;
+ }
+
+ @Override
+ public String getPrimaryKey() {
+ return primaryKey;
+ }
+
+ @Override
+ public List<FieldInfo> getPartitionFields() {
+ return super.getPartitionFields();
+ }
+
+ private void validateAuth(Map<String, String> properties) {
+ Preconditions.checkNotNull(properties);
+ Preconditions.checkNotNull(properties.get(DLCConstant.DLC_SECRET_ID), "dlc secret-id is null");
+ Preconditions.checkNotNull(properties.get(DLCConstant.DLC_SECRET_KEY), "dlc secret-key is null");
+ Preconditions.checkNotNull(properties.get(DLCConstant.DLC_REGION), "dlc region is null");
+
+ Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_REGION), "cos region is null");
+ Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_SECRET_ID), "cos secret-id is null");
+ Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_SECRET_KEY), "cos secret-key is null");
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java
new file mode 100644
index 000000000..0f690b75a
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.DLCConstant;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * test for dlc load node
+ */
+public class DLCIcebergLoadNodeTest extends SerializeBaseTest<DLCIcebergLoadNode> {
+ @Override
+ public DLCIcebergLoadNode getTestObject() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(DLCConstant.DLC_REGION, "ap-beijing");
+ properties.put(DLCConstant.DLC_SECRET_ID, "XXXXXXXXXXX");
+ properties.put(DLCConstant.DLC_SECRET_KEY, "XXXXXXXXXXX");
+
+ properties.put(DLCConstant.FS_COS_REGION, "ap-beijing");
+ properties.put(DLCConstant.FS_COS_SECRET_ID, "XXXXXXXXXXX");
+ properties.put(DLCConstant.FS_COS_SECRET_KEY, "XXXXXXXXXXX");
+ return new DLCIcebergLoadNode(
+ "iceberg_dlc",
+ "iceberg_dlc_output",
+ Arrays.asList(new FieldInfo("field", new StringFormatInfo())),
+ Arrays.asList(new FieldRelation(new FieldInfo("field", new StringFormatInfo()),
+ new FieldInfo("field", new StringFormatInfo()))),
+ null,
+ null,
+ null,
+ properties,
+ "inlong",
+ "inlong_iceberg_dlc",
+ null,
+ null,
+ "/iceberg_dlc/warehouse");
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/pom.xml b/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
new file mode 100644
index 000000000..c09d61bf6
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>sort-connectors</artifactId>
+ <groupId>org.apache.inlong</groupId>
+ <version>1.3.0-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>sort-connector-iceberg-dlc</artifactId>
+ <name>Apache InLong - Sort-connector-iceberg-dlc</name>
+ <packaging>jar</packaging>
+ <description>
+ DLC is a data lake product.It's storage is based on iceberg.
+ Here is the product: https://cloud.tencent.com/product/dlc
+ </description>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.qcloud.cos</groupId>
+ <artifactId>hadoop-cos</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.qcloud</groupId>
+ <artifactId>cos_api-bundle</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.tencentcloudapi</groupId>
+ <artifactId>tencentcloud-sdk-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.qcloud</groupId>
+ <artifactId>dlc-data-catalog-metastore-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.tencentcloudapi</groupId>
+ <artifactId>tencentcloud-sdk-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-flink-runtime-1.13</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${dlc.hive.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.pentaho</groupId>
+ <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>org.apache.iceberg:*</include>
+ <include>org.apache.hive:hive-exec</include>
+
+ <!--tencent cloud-->
+ <include>com.qcloud.cos:*</include>
+ <include>com.qcloud:*</include>
+ <include>com.tencentcloudapi:*</include>
+ <include>commons-logging:commons-logging</include>
+ <include>com.squareup.okio:okio</include>
+ <include>com.squareup.okhttp:okhttp</include>
+ <include>com.squareup.okhttp:logging-interceptor</include>
+ <include>org.ini4j:ini4j</include>
+ <include>com.google.code.gson:gson</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.parquet</pattern>
+ <shadedPattern>org.apache.hive.shaded.parquet</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/CachedClientPool.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/CachedClientPool.java
new file mode 100644
index 000000000..7167d1dd0
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/CachedClientPool.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.catalog.hybris;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.Cache;
+import org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CachedClientPool implements ClientPool<IMetaStoreClient, TException> {
+ private static final Logger LOG = LoggerFactory.getLogger(CachedClientPool.class);
+ private static Cache<String, DLCWrappedHybrisClientPool> clientPoolCache;
+
+ private final Configuration conf;
+ private final String metastoreUri;
+ private final int clientPoolSize;
+ private final long evictionInterval;
+
+ CachedClientPool(Configuration conf, Map<String, String> properties) {
+ this.conf = conf;
+ this.metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+ this.clientPoolSize = PropertyUtil.propertyAsInt(properties,
+ CatalogProperties.CLIENT_POOL_SIZE,
+ CatalogProperties.CLIENT_POOL_SIZE_DEFAULT);
+ this.evictionInterval = PropertyUtil.propertyAsLong(properties,
+ CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
+ CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT);
+ init();
+ }
+
+ @VisibleForTesting
+ DLCWrappedHybrisClientPool clientPool() {
+ return clientPoolCache.get(metastoreUri, k -> new DLCWrappedHybrisClientPool(clientPoolSize, conf));
+ }
+
+ private synchronized void init() {
+ if (clientPoolCache == null) {
+ clientPoolCache = Caffeine.newBuilder().expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
+ .removalListener((key, value, cause) -> ((DLCWrappedHybrisClientPool) value).close())
+ .build();
+ }
+ }
+
+ @VisibleForTesting
+ static Cache<String, DLCWrappedHybrisClientPool> clientPoolCache() {
+ return clientPoolCache;
+ }
+
+ @Override
+ public <R> R run(Action<R, IMetaStoreClient, TException> action) throws TException, InterruptedException {
+ return clientPool().run(action);
+ }
+
+ @Override
+ public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry)
+ throws TException, InterruptedException {
+ return clientPool().run(action, retry);
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DLCWrappedHybrisClientPool.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DLCWrappedHybrisClientPool.java
new file mode 100644
index 000000000..dcc20a14f
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DLCWrappedHybrisClientPool.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.catalog.hybris;
+
+import com.qcloud.dlc.metastore.DLCDataCatalogMetastoreClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.iceberg.ClientPoolImpl;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.hive.RuntimeMetaException;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * DLC Catalog client pool.
+ */
+public class DLCWrappedHybrisClientPool extends ClientPoolImpl<IMetaStoreClient, TException> {
+
+ // use appropriate ctor depending on whether we're working with Hive2 or Hive3 dependencies
+ // we need to do this because there is a breaking API change between Hive2 and Hive3
+ private static final DynConstructors.Ctor<DLCDataCatalogMetastoreClient> CLIENT_CTOR = DynConstructors.builder()
+ .impl(DLCDataCatalogMetastoreClient.class, HiveConf.class)
+ .impl(DLCDataCatalogMetastoreClient.class, Configuration.class).build();
+
+ private final HiveConf hiveConf;
+
+ public DLCWrappedHybrisClientPool(int poolSize, Configuration conf) {
+ // Do not allow retry by default as we rely on RetryingHiveClient
+ super(poolSize, TTransportException.class, false);
+ this.hiveConf = new HiveConf(conf, DLCWrappedHybrisClientPool.class);
+ this.hiveConf.addResource(conf);
+ }
+
+ @Override
+ protected IMetaStoreClient newClient() {
+ try {
+ try {
+ return CLIENT_CTOR.newInstance(hiveConf);
+ } catch (RuntimeException e) {
+ // any MetaException would be wrapped into RuntimeException during reflection,
+ // so let's double-check type here
+ if (e.getCause() instanceof MetaException) {
+ throw (MetaException) e.getCause();
+ }
+ throw e;
+ }
+ } catch (MetaException e) {
+ throw new RuntimeMetaException(e, "Failed to connect to Hive Metastore");
+ } catch (Throwable t) {
+ if (t.getMessage().contains("Another instance of Derby may have already booted")) {
+ throw new RuntimeMetaException(t, "Failed to start an embedded metastore because embedded "
+ + "Derby supports only one client at a time. To fix this, use a metastore that supports "
+ + "multiple clients.");
+ }
+
+ throw new RuntimeMetaException(t, "Failed to connect to Hive Metastore");
+ }
+ }
+
+ @Override
+ protected IMetaStoreClient reconnect(IMetaStoreClient client) {
+ try {
+ client.close();
+ client.reconnect();
+ } catch (MetaException e) {
+ throw new RuntimeMetaException(e, "Failed to reconnect to Hive Metastore");
+ }
+ return client;
+ }
+
+ @Override
+ protected boolean isConnectionException(Exception e) {
+ return super.isConnectionException(e) || (e != null && e instanceof MetaException
+ && e.getMessage().contains("Got exception: org.apache.thrift.transport.TTransportException"));
+ }
+
+ @Override
+ protected void close(IMetaStoreClient client) {
+ client.close();
+ }
+
+ @VisibleForTesting
+ HiveConf hiveConf() {
+ return hiveConf;
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java
new file mode 100644
index 000000000..a8f64db52
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.catalog.hybris;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.qcloud.dlc.metastore.DLCDataCatalogMetastoreClient;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The only changed point is HiveClientPool, from
+ * {@link HiveMetaStoreClient} to {@link DLCDataCatalogMetastoreClient}
+ */
+public class DlcWrappedHybrisCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
+ public static final String LIST_ALL_TABLES = "list-all-tables";
+ public static final String LIST_ALL_TABLES_DEFAULT = "false";
+
+ // dlc auth
+ public static final String DLC_ENDPOINT = "qcloud.dlc.endpoint";
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(DlcWrappedHybrisCatalog.class);
+
+ private String name;
+ private Configuration conf;
+ private FileIO fileIO;
+ private ClientPool<IMetaStoreClient, TException> clients;
+ private boolean listAllTables = false;
+
+ public DlcWrappedHybrisCatalog() {
+ }
+
+ @Override
+ public void initialize(String inputName, Map<String, String> properties) {
+ this.name = inputName;
+ if (conf == null) {
+ LOG.warn("No Hadoop Configuration was set, using the default environment Configuration");
+ this.conf = new Configuration();
+ }
+
+ if (properties.containsKey(CatalogProperties.URI)) {
+ this.conf.set(DLC_ENDPOINT, properties.get(CatalogProperties.URI));
+ }
+
+ if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
+ this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
+ properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+ }
+
+ // dlc auth
+ properties.entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith("qcloud.dlc"))
+ .forEach(entry -> this.conf.set(entry.getKey(), entry.getValue()));
+
+ // lakefs auth
+ properties.entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith("fs"))
+ .forEach(entry -> this.conf.set(entry.getKey(), entry.getValue()));
+
+ this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));
+
+ String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+ this.fileIO = fileIOImpl == null
+ ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
+
+ this.clients = new CachedClientPool(conf, properties);
+ }
+
+ @Override
+ public List<TableIdentifier> listTables(Namespace namespace) {
+ Preconditions.checkArgument(isValidateNamespace(namespace),
+ "Missing database in namespace: %s", namespace);
+ String database = namespace.level(0);
+
+ try {
+ List<String> tableNames = clients.run(client -> client.getAllTables(database));
+ List<TableIdentifier> tableIdentifiers;
+
+ if (listAllTables) {
+ tableIdentifiers = tableNames.stream()
+ .map(t -> TableIdentifier.of(namespace, t))
+ .collect(Collectors.toList());
+ } else {
+ List<Table> tableObjects = clients.run(client -> client.getTableObjectsByName(database, tableNames));
+ tableIdentifiers = tableObjects.stream()
+ .filter(table -> table.getParameters() != null
+ && BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
+ .equalsIgnoreCase(
+ table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
+ .map(table -> TableIdentifier.of(namespace, table.getTableName()))
+ .collect(Collectors.toList());
+ }
+
+ LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, tableIdentifiers);
+ return tableIdentifiers;
+
+ } catch (UnknownDBException e) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
+
+ } catch (TException e) {
+ throw new RuntimeException("Failed to list all tables under namespace " + namespace, e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to listTables", e);
+ }
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public boolean dropTable(TableIdentifier identifier, boolean purge) {
+ if (!isValidIdentifier(identifier)) {
+ return false;
+ }
+
+ String database = identifier.namespace().level(0);
+
+ TableOperations ops = newTableOps(identifier);
+ TableMetadata lastMetadata;
+ if (purge && ops.current() != null) {
+ lastMetadata = ops.current();
+ } else {
+ lastMetadata = null;
+ }
+
+ try {
+ clients.run(client -> {
+ client.dropTable(database, identifier.name(),
+ false /* do not delete data */,
+ false /* throw NoSuchObjectException if the table doesn't exist */);
+ return null;
+ });
+
+ if (purge && lastMetadata != null) {
+ CatalogUtil.dropTableData(ops.io(), lastMetadata);
+ }
+
+ LOG.info("Dropped table: {}", identifier);
+ return true;
+
+ } catch (NoSuchTableException | NoSuchObjectException e) {
+ LOG.info("Skipping drop, table does not exist: {}", identifier, e);
+ return false;
+
+ } catch (TException e) {
+ throw new RuntimeException("Failed to drop " + identifier, e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to dropTable", e);
+ }
+ }
+
+ @Override
+ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
+ if (!isValidIdentifier(from)) {
+ throw new NoSuchTableException("Invalid identifier: %s", from);
+ }
+
+ TableIdentifier to = removeCatalogName(originalTo);
+ Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to);
+
+ String toDatabase = to.namespace().level(0);
+ String fromDatabase = from.namespace().level(0);
+ String fromName = from.name();
+
+ try {
+ Table table = clients.run(client -> client.getTable(fromDatabase, fromName));
+ HiveTableOperations.validateTableIsIceberg(table, fullTableName(name, from));
+
+ table.setDbName(toDatabase);
+ table.setTableName(to.name());
+
+ clients.run(client -> {
+ client.alter_table(fromDatabase, fromName, table);
+ return null;
+ });
+
+ LOG.info("Renamed table from {}, to {}", from, to);
+
+ } catch (NoSuchObjectException e) {
+ throw new NoSuchTableException("Table does not exist: %s", from);
+
+ } catch (AlreadyExistsException e) {
+ throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", to);
+
+ } catch (TException e) {
+ throw new RuntimeException("Failed to rename " + from + " to " + to, e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to rename", e);
+ }
+ }
+
+ @Override
+ public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
+ Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);
+
+ // Throw an exception if this table already exists in the catalog.
+ if (tableExists(identifier)) {
+ throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
+ }
+
+ TableOperations ops = newTableOps(identifier);
+ InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
+ TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
+ ops.commit(null, metadata);
+
+ return new BaseTable(ops, identifier.toString());
+ }
+
+ @Override
+ public void createNamespace(Namespace namespace, Map<String, String> meta) {
+ Preconditions.checkArgument(
+ !namespace.isEmpty(),
+ "Cannot create namespace with invalid name: %s", namespace);
+ Preconditions.checkArgument(isValidateNamespace(namespace),
+ "Cannot support multi part namespace in Hive Metastore: %s", namespace);
+
+ try {
+ clients.run(client -> {
+ client.createDatabase(convertToDatabase(namespace, meta));
+ return null;
+ });
+
+ LOG.info("Created namespace: {}", namespace);
+
+ } catch (AlreadyExistsException e) {
+ throw new org.apache.iceberg.exceptions.AlreadyExistsException(e, "Namespace '%s' already exists!",
+ namespace);
+
+ } catch (TException e) {
+ throw new RuntimeException("Failed to create namespace " + namespace + " in Hive Metastore", e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Interrupted in call to createDatabase(name) " + namespace + " in Hive Metastore", e);
+ }
+ }
+
+ @Override
+ public List<Namespace> listNamespaces(Namespace namespace) {
+ if (!isValidateNamespace(namespace) && !namespace.isEmpty()) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
+ }
+ if (!namespace.isEmpty()) {
+ return ImmutableList.of();
+ }
+ try {
+ List<Namespace> namespaces = clients.run(IMetaStoreClient::getAllDatabases)
+ .stream()
+ .map(Namespace::of)
+ .collect(Collectors.toList());
+
+ LOG.debug("Listing namespace {} returned tables: {}", namespace, namespaces);
+ return namespaces;
+
+ } catch (TException e) {
+ throw new RuntimeException("Failed to list all namespace: " + namespace + " in Hive Metastore", e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Interrupted in call to getAllDatabases() " + namespace + " in Hive Metastore", e);
+ }
+ }
+
+ @Override
+ public boolean dropNamespace(Namespace namespace) {
+ if (!isValidateNamespace(namespace)) {
+ return false;
+ }
+
+ try {
+ clients.run(client -> {
+ client.dropDatabase(namespace.level(0),
+ false /* deleteData */,
+ false /* ignoreUnknownDb */,
+ false /* cascade */);
+ return null;
+ });
+
+ LOG.info("Dropped namespace: {}", namespace);
+ return true;
+
+ } catch (InvalidOperationException e) {
+ throw new NamespaceNotEmptyException(e, "Namespace %s is not empty. One or more tables exist.", namespace);
+
+ } catch (NoSuchObjectException e) {
+ return false;
+
+ } catch (TException e) {
+ throw new RuntimeException("Failed to drop namespace " + namespace + " in Hive Metastore", e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Interrupted in call to drop dropDatabase(name) " + namespace + " in Hive Metastore", e);
+ }
+ }
+
+ @Override
+ public boolean setProperties(Namespace namespace, Map<String, String> properties) {
+ Map<String, String> parameter = Maps.newHashMap();
+
+ parameter.putAll(loadNamespaceMetadata(namespace));
+ parameter.putAll(properties);
+ Database database = convertToDatabase(namespace, parameter);
+
+ alterHiveDataBase(namespace, database);
+ LOG.debug("Successfully set properties {} for {}", properties.keySet(), namespace);
+
+ // Always successful, otherwise exception is thrown
+ return true;
+ }
+
+ @Override
+ public boolean removeProperties(Namespace namespace, Set<String> properties) {
+ Map<String, String> parameter = Maps.newHashMap();
+
+ parameter.putAll(loadNamespaceMetadata(namespace));
+ properties.forEach(key -> parameter.put(key, null));
+ Database database = convertToDatabase(namespace, parameter);
+
+ alterHiveDataBase(namespace, database);
+ LOG.debug("Successfully removed properties {} from {}", properties, namespace);
+
+ // Always successful, otherwise exception is thrown
+ return true;
+ }
+
+ private void alterHiveDataBase(Namespace namespace, Database database) {
+ try {
+ clients.run(client -> {
+ client.alterDatabase(namespace.level(0), database);
+ return null;
+ });
+
+ } catch (NoSuchObjectException | UnknownDBException e) {
+ throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace);
+
+ } catch (TException e) {
+ throw new RuntimeException(
+ "Failed to list namespace under namespace: " + namespace + " in Hive Metastore", e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Interrupted in call to getDatabase(name) " + namespace + " in Hive Metastore", e);
+ }
+ }
+
+ @Override
+ public Map<String, String> loadNamespaceMetadata(Namespace namespace) {
+ if (!isValidateNamespace(namespace)) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
+ }
+
+ try {
+ Database database = clients.run(client -> client.getDatabase(namespace.level(0)));
+ Map<String, String> metadata = convertToMetadata(database);
+ LOG.debug("Loaded metadata for namespace {} found {}", namespace, metadata.keySet());
+ return metadata;
+
+ } catch (NoSuchObjectException | UnknownDBException e) {
+ throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace);
+
+ } catch (TException e) {
+ throw new RuntimeException(
+ "Failed to list namespace under namespace: " + namespace + " in Hive Metastore", e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Interrupted in call to getDatabase(name) " + namespace + " in Hive Metastore", e);
+ }
+ }
+
+ @Override
+ protected boolean isValidIdentifier(TableIdentifier tableIdentifier) {
+ return tableIdentifier.namespace().levels().length == 1;
+ }
+
+ private TableIdentifier removeCatalogName(TableIdentifier to) {
+ if (isValidIdentifier(to)) {
+ return to;
+ }
+
+ // check if the identifier includes the catalog name and remove it
+ if (to.namespace().levels().length == 2 && name().equalsIgnoreCase(to.namespace().level(0))) {
+ return TableIdentifier.of(Namespace.of(to.namespace().level(1)), to.name());
+ }
+
+ // return the original unmodified
+ return to;
+ }
+
+ private boolean isValidateNamespace(Namespace namespace) {
+ return namespace.levels().length == 1;
+ }
+
+ @Override
+ public TableOperations newTableOps(TableIdentifier tableIdentifier) {
+ String dbName = tableIdentifier.namespace().level(0);
+ String tableName = tableIdentifier.name();
+ return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName);
+ }
+
+ @Override
+ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+ // This is a little edgy since we basically duplicate the HMS location generation logic.
+ // Sadly I do not see a good way around this if we want to keep the order of events, like:
+ // - Create meta files
+ // - Create the metadata in HMS, and this way committing the changes
+
+ // Create a new location based on the namespace / database if it is set on database level
+ try {
+ Database databaseData = clients.run(client -> client.getDatabase(tableIdentifier.namespace().levels()[0]));
+ if (databaseData.getLocationUri() != null) {
+ // If the database location is set use it as a base.
+ return String.format("%s/%s", databaseData.getLocationUri(), tableIdentifier.name());
+ }
+
+ } catch (TException e) {
+ throw new RuntimeException(String.format("Metastore operation failed for %s", tableIdentifier), e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during commit", e);
+ }
+
+ // Otherwise stick to the {WAREHOUSE_DIR}/{DB_NAME}.db/{TABLE_NAME} path
+ String warehouseLocation = getWarehouseLocation();
+ return String.format(
+ "%s/%s.db/%s",
+ warehouseLocation,
+ tableIdentifier.namespace().levels()[0],
+ tableIdentifier.name());
+ }
+
+ private String getWarehouseLocation() {
+ String warehouseLocation = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
+ Preconditions.checkNotNull(warehouseLocation,
+ "Warehouse location is not set: hive.metastore.warehouse.dir=null");
+ return warehouseLocation;
+ }
+
+ private Map<String, String> convertToMetadata(Database database) {
+
+ Map<String, String> meta = Maps.newHashMap();
+
+ meta.putAll(database.getParameters());
+ meta.put("location", database.getLocationUri());
+ if (database.getDescription() != null) {
+ meta.put("comment", database.getDescription());
+ }
+
+ return meta;
+ }
+
+ Database convertToDatabase(Namespace namespace, Map<String, String> meta) {
+ if (!isValidateNamespace(namespace)) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
+ }
+
+ Database database = new Database();
+ Map<String, String> parameter = Maps.newHashMap();
+
+ database.setName(namespace.level(0));
+ database.setLocationUri(new Path(getWarehouseLocation(), namespace.level(0)).toString() + ".db");
+
+ meta.forEach((key, value) -> {
+ if (key.equals("comment")) {
+ database.setDescription(value);
+ } else if (key.equals("location")) {
+ database.setLocationUri(value);
+ } else {
+ if (value != null) {
+ parameter.put(key, value);
+ }
+ }
+ });
+ database.setParameters(parameter);
+
+ return database;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("name", name)
+ .add("uri", this.conf == null ? "" : this.conf.get(HiveConf.ConfVars.METASTOREURIS.varname))
+ .toString();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = new Configuration(conf);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @VisibleForTesting
+ void setListAllTables(boolean listAllTables) {
+ this.listAllTables = listAllTables;
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/HiveTableOperations.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/HiveTableOperations.java
new file mode 100644
index 000000000..62cd7a939
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/HiveTableOperations.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.catalog.hybris;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.ConfigProperties;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.BiMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.Cache;
+import org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.iceberg.util.Tasks;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.GC_ENABLED;
+
+/**
+ * TODO we should be able to extract some more commonalities to BaseMetastoreTableOperations to
+ * avoid code duplication between this class and Metacat Tables.
+ */
+public class HiveTableOperations extends BaseMetastoreTableOperations {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class);
+
+ private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
+ private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
+ private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
+ private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = "iceberg.hive.metadata-refresh-max-retries";
+ private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = "iceberg.hive.table-level-lock-evict-ms";
+ private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes
+ private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds
+ private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds
+ private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2;
+ private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10);
+ private static final DynMethods.UnboundMethod ALTER_TABLE = DynMethods.builder("alter_table")
+ .impl(IMetaStoreClient.class, "alter_table_with_environmentContext",
+ String.class, String.class, Table.class, EnvironmentContext.class)
+ .impl(IMetaStoreClient.class, "alter_table",
+ String.class, String.class, Table.class, EnvironmentContext.class)
+ .impl(IMetaStoreClient.class, "alter_table",
+ String.class, String.class, Table.class)
+ .build();
+ private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of(
+ // gc.enabled in Iceberg and external.table.purge in Hive
+ // are meant to do the same things but with different names
+ GC_ENABLED, "external.table.purge"
+ );
+
+ private static Cache<String, ReentrantLock> commitLockCache;
+
+ private static synchronized void initTableLevelLockCache(long evictionTimeout) {
+ if (commitLockCache == null) {
+ commitLockCache = Caffeine.newBuilder()
+ .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+ .build();
+ }
+ }
+
+ /**
+ * Provides key translation where necessary between Iceberg and HMS props. This translation is needed because some
+ * properties control the same behaviour but are named differently in Iceberg and Hive. Therefore changes to these
+ * property pairs should be synchronized.
+ *
+ * Example: Deleting data files upon DROP TABLE is enabled using gc.enabled=true in Iceberg and
+ * external.table.purge=true in Hive. Hive and Iceberg users are unaware of each other's control flags, therefore
+ * inconsistent behaviour can occur from e.g. a Hive user's point of view if external.table.purge=true is set on the
+ * HMS table but gc.enabled=false is set on the Iceberg table, resulting in no data file deletion.
+ *
+ * @param hmsProp The HMS property that should be translated to Iceberg property
+ * @return Iceberg property equivalent to the hmsProp.
+ * If no such translation exists, the original hmsProp is returned
+ */
+ public static String translateToIcebergProp(String hmsProp) {
+ return ICEBERG_TO_HMS_TRANSLATION.inverse().getOrDefault(hmsProp, hmsProp);
+ }
+
+ private static class WaitingForLockException extends RuntimeException {
+ WaitingForLockException(String message) {
+ super(message);
+ }
+ }
+
+ private final String fullName;
+ private final String database;
+ private final String tableName;
+ private final Configuration conf;
+ private final long lockAcquireTimeout;
+ private final long lockCheckMinWaitTime;
+ private final long lockCheckMaxWaitTime;
+ private final int metadataRefreshMaxRetries;
+ private final FileIO fileIO;
+ private final ClientPool<IMetaStoreClient, TException> metaClients;
+
+ protected HiveTableOperations(Configuration conf, ClientPool metaClients, FileIO fileIO,
+ String catalogName, String database, String table) {
+ this.conf = conf;
+ this.metaClients = metaClients;
+ this.fileIO = fileIO;
+ this.fullName = catalogName + "." + database + "." + table;
+ this.database = database;
+ this.tableName = table;
+ this.lockAcquireTimeout =
+ conf.getLong(HIVE_ACQUIRE_LOCK_TIMEOUT_MS, HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT);
+ this.lockCheckMinWaitTime =
+ conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
+ this.lockCheckMaxWaitTime =
+ conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+ this.metadataRefreshMaxRetries =
+ conf.getInt(HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES,
+ HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT);
+ long tableLevelLockCacheEvictionTimeout =
+ conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+ initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
+ }
+
+ @Override
+ protected String tableName() {
+ return fullName;
+ }
+
+ @Override
+ public FileIO io() {
+ return fileIO;
+ }
+
+ @Override
+ protected void doRefresh() {
+ String metadataLocation = null;
+ try {
+ Table table = metaClients.run(client -> client.getTable(database, tableName));
+ validateTableIsIceberg(table, fullName);
+
+ metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
+
+ } catch (NoSuchObjectException e) {
+ if (currentMetadataLocation() != null) {
+ throw new NoSuchTableException("No such table: %s.%s", database, tableName);
+ }
+
+ } catch (TException e) {
+ String errMsg = String.format("Failed to get table info from metastore %s.%s", database, tableName);
+ throw new RuntimeException(errMsg, e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during refresh", e);
+ }
+
+ refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries);
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ @Override
+ protected void doCommit(TableMetadata base, TableMetadata metadata) {
+ String newMetadataLocation = base == null && metadata.metadataFileLocation() != null
+ ? metadata.metadataFileLocation() : writeNewMetadata(metadata, currentVersion() + 1);
+ boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
+ boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false);
+
+ CommitStatus commitStatus = CommitStatus.FAILURE;
+ boolean updateHiveTable = false;
+ Optional<Long> lockId = Optional.empty();
+ // getting a process-level lock per table to avoid concurrent commit attempts to the same table from the same
+ // JVM process, which would result in unnecessary and costly HMS lock acquisition requests
+ ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true));
+ tableLevelMutex.lock();
+ try {
+ lockId = Optional.of(acquireLock());
+ // TODO add lock heart beating for cases where default lock timeout is too low.
+
+ Table tbl = loadHmsTable();
+
+ if (tbl != null) {
+ // If we try to create the table but the metadata location is already set,
+ // then we had a concurrent commit
+ if (base == null
+ && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) != null) {
+ throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);
+ }
+
+ updateHiveTable = true;
+ LOG.debug("Committing existing table: {}", fullName);
+ } else {
+ tbl = newHmsTable();
+ LOG.debug("Committing new table: {}", fullName);
+ }
+
+ tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes
+
+ String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
+ String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
+ if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
+ throw new CommitFailedException(
+ "Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s",
+ baseMetadataLocation, metadataLocation, database, tableName);
+ }
+
+ // get Iceberg props that have been removed
+ Set<String> removedProps = Collections.emptySet();
+ if (base != null) {
+ removedProps = base.properties().keySet().stream()
+ .filter(key -> !metadata.properties().containsKey(key))
+ .collect(Collectors.toSet());
+ }
+
+ Map<String, String> summary = Optional.ofNullable(metadata.currentSnapshot())
+ .map(Snapshot::summary)
+ .orElseGet(ImmutableMap::of);
+ setHmsTableParameters(newMetadataLocation, tbl, metadata, removedProps, hiveEngineEnabled, summary);
+
+ if (!keepHiveStats) {
+ tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
+ }
+
+ try {
+ persistTable(tbl, updateHiveTable);
+ commitStatus = CommitStatus.SUCCESS;
+ } catch (Throwable persistFailure) {
+ LOG.error("Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
+ database, tableName, persistFailure);
+ commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+ switch (commitStatus) {
+ case SUCCESS:
+ break;
+ case FAILURE:
+ throw persistFailure;
+ case UNKNOWN:
+ throw new CommitStateUnknownException(persistFailure);
+ }
+ }
+ } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
+ throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);
+
+ } catch (TException | UnknownHostException e) {
+ if (e.getMessage() != null && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
+ throw new RuntimeException("Failed to acquire locks from metastore because 'HIVE_LOCKS' doesn't "
+ + "exist, this probably happened when using embedded metastore or doesn't create a "
+ + "transactional meta table. To fix this, use an alternative metastore", e);
+ }
+
+ throw new RuntimeException(String.format("Metastore operation failed for %s.%s", database, tableName), e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during commit", e);
+
+ } finally {
+ cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex);
+ }
+ }
+
+ @VisibleForTesting
+ void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException {
+ if (updateHiveTable) {
+ metaClients.run(client -> {
+ EnvironmentContext envContext = new EnvironmentContext(
+ ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE)
+ );
+ ALTER_TABLE.invoke(client, database, tableName, hmsTable, envContext);
+ return null;
+ });
+ } else {
+ metaClients.run(client -> {
+ client.createTable(hmsTable);
+ return null;
+ });
+ }
+ }
+
+ private Table loadHmsTable() throws TException, InterruptedException {
+ try {
+ return metaClients.run(client -> client.getTable(database, tableName));
+ } catch (NoSuchObjectException nte) {
+ LOG.trace("Table not found {}", fullName, nte);
+ return null;
+ }
+ }
+
+ private Table newHmsTable() {
+ final long currentTimeMillis = System.currentTimeMillis();
+
+ Table newTable = new Table(tableName,
+ database,
+ System.getProperty("user.name"),
+ (int) currentTimeMillis / 1000,
+ (int) currentTimeMillis / 1000,
+ Integer.MAX_VALUE,
+ null,
+ Collections.emptyList(),
+ Maps.newHashMap(),
+ null,
+ null,
+ TableType.EXTERNAL_TABLE.toString());
+
+ newTable.getParameters().put("EXTERNAL", "TRUE"); // using the external table type also requires this
+ return newTable;
+ }
+
+ private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableMetadata metadata,
+ Set<String> obsoleteProps, boolean hiveEngineEnabled,
+ Map<String, String> summary) {
+ Map<String, String> parameters = Optional.ofNullable(tbl.getParameters())
+ .orElseGet(Maps::newHashMap);
+
+ // push all Iceberg table properties into HMS
+ metadata.properties().forEach((key, value) -> {
+ // translate key names between Iceberg and HMS where needed
+ String hmsKey = ICEBERG_TO_HMS_TRANSLATION.getOrDefault(key, key);
+ parameters.put(hmsKey, value);
+ });
+ if (metadata.uuid() != null) {
+ parameters.put(TableProperties.UUID, metadata.uuid());
+ }
+
+ // remove any props from HMS that are no longer present in Iceberg table props
+ obsoleteProps.forEach(parameters::remove);
+
+ parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH));
+ parameters.put(METADATA_LOCATION_PROP, newMetadataLocation);
+
+ if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) {
+ parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation());
+ }
+
+ // If needed set the 'storage_handler' property to enable query from Hive
+ if (hiveEngineEnabled) {
+ parameters.put(hive_metastoreConstants.META_TABLE_STORAGE,
+ "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler");
+ } else {
+ parameters.remove(hive_metastoreConstants.META_TABLE_STORAGE);
+ }
+
+ // Set the basic statistics
+ if (summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP) != null) {
+ parameters.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
+ }
+ if (summary.get(SnapshotSummary.TOTAL_RECORDS_PROP) != null) {
+ parameters.put(StatsSetupConst.ROW_COUNT, summary.get(SnapshotSummary.TOTAL_RECORDS_PROP));
+ }
+ if (summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP) != null) {
+ parameters.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
+ }
+
+ tbl.setParameters(parameters);
+ }
+
+ private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) {
+
+ final StorageDescriptor storageDescriptor = new StorageDescriptor();
+ storageDescriptor.setCols(HiveSchemaUtil.convert(metadata.schema()));
+ storageDescriptor.setLocation(metadata.location());
+ SerDeInfo serDeInfo = new SerDeInfo();
+ serDeInfo.setParameters(Maps.newHashMap());
+ if (hiveEngineEnabled) {
+ storageDescriptor.setInputFormat("org.apache.iceberg.mr.hive.HiveIcebergInputFormat");
+ storageDescriptor.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat");
+ serDeInfo.setSerializationLib("org.apache.iceberg.mr.hive.HiveIcebergSerDe");
+ } else {
+ storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileOutputFormat");
+ storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
+ serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
+ }
+ storageDescriptor.setSerdeInfo(serDeInfo);
+ return storageDescriptor;
+ }
+
+ @VisibleForTesting
+ long acquireLock() throws UnknownHostException, TException, InterruptedException {
+ final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
+ lockComponent.setTablename(tableName);
+ final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent),
+ System.getProperty("user.name"),
+ InetAddress.getLocalHost().getHostName());
+ LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));
+ AtomicReference<LockState> state = new AtomicReference<>(lockResponse.getState());
+ long lockId = lockResponse.getLockid();
+
+ final long start = System.currentTimeMillis();
+ long duration = 0;
+ boolean timeout = false;
+
+ try {
+ if (state.get().equals(LockState.WAITING)) {
+ // Retry count is the typical "upper bound of retries" for Tasks.run() function.
+ // In fact, the maximum number of
+ // attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use timeout as the
+ // upper bound of retries. So it is just reasonable to set a large retry count. However, if we set
+ // Integer.MAX_VALUE, the above logic of `retries + 1` would overflow into Integer.MIN_VALUE. Hence,
+ // the retry is set conservatively as `Integer.MAX_VALUE - 100` so it doesn't hit any boundary issues.
+ Tasks.foreach(lockId)
+ .retry(Integer.MAX_VALUE - 100)
+ .exponentialBackoff(
+ lockCheckMinWaitTime,
+ lockCheckMaxWaitTime,
+ lockAcquireTimeout,
+ 1.5)
+ .throwFailureWhenFinished()
+ .onlyRetryOn(WaitingForLockException.class)
+ .run(id -> {
+ try {
+ LockResponse response = metaClients.run(client -> client.checkLock(id));
+ LockState newState = response.getState();
+ state.set(newState);
+ if (newState.equals(LockState.WAITING)) {
+ throw new WaitingForLockException("Waiting for lock.");
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted(); // Clear the interrupt status flag
+ LOG.warn("Interrupted while waiting for lock.", e);
+ }
+ }, TException.class);
+ }
+ } catch (WaitingForLockException waitingForLockException) {
+ timeout = true;
+ duration = System.currentTimeMillis() - start;
+ } finally {
+ if (!state.get().equals(LockState.ACQUIRED)) {
+ unlock(Optional.of(lockId));
+ }
+ }
+
+ // timeout and do not have lock acquired
+ if (timeout && !state.get().equals(LockState.ACQUIRED)) {
+ throw new CommitFailedException("Timed out after %s ms waiting for lock on %s.%s",
+ duration, database, tableName);
+ }
+
+ if (!state.get().equals(LockState.ACQUIRED)) {
+ throw new CommitFailedException("Could not acquire the lock on %s.%s, "
+ + "lock request ended in state %s", database, tableName, state);
+ }
+ return lockId;
+ }
+
+ private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, Optional<Long> lockId,
+ ReentrantLock tableLevelMutex) {
+ try {
+ if (commitStatus == CommitStatus.FAILURE) {
+ // If we are sure the commit failed, clean up the uncommitted metadata file
+ io().deleteFile(metadataLocation);
+ }
+ } catch (RuntimeException e) {
+ LOG.error("Fail to cleanup metadata file at {}", metadataLocation, e);
+ throw e;
+ } finally {
+ unlock(lockId);
+ tableLevelMutex.unlock();
+ }
+ }
+
+ private void unlock(Optional<Long> lockId) {
+ if (lockId.isPresent()) {
+ try {
+ doUnlock(lockId.get());
+ } catch (Exception e) {
+ LOG.warn("Failed to unlock {}.{}", database, tableName, e);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void doUnlock(long lockId) throws TException, InterruptedException {
+ metaClients.run(client -> {
+ client.unlock(lockId);
+ return null;
+ });
+ }
+
+ static void validateTableIsIceberg(Table table, String fullName) {
+ String tableType = table.getParameters().get(TABLE_TYPE_PROP);
+ NoSuchIcebergTableException.check(tableType != null && tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE),
+ "Not an iceberg table: %s (type=%s)", fullName, tableType);
+ }
+
+ /**
+ * Returns if the hive engine related values should be enabled on the table, or not.
+ * <p>
+ * The decision is made like this:
+ * <ol>
+ * <li>Table property value {@link TableProperties#ENGINE_HIVE_ENABLED}
+ * <li>If the table property is not set then check the hive-site.xml property value
+ * {@link ConfigProperties#ENGINE_HIVE_ENABLED}
+ * <li>If none of the above is enabled then use the default value
+ * {@link TableProperties#ENGINE_HIVE_ENABLED_DEFAULT}
+ * </ol>
+ * @param metadata Table metadata to use
+ * @param conf The hive configuration to use
+ * @return if the hive engine related values should be enabled or not
+ */
+ private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration conf) {
+ if (metadata.properties().get(TableProperties.ENGINE_HIVE_ENABLED) != null) {
+ // We know that the property is set, so default value will not be used,
+ return metadata.propertyAsBoolean(TableProperties.ENGINE_HIVE_ENABLED, false);
+ }
+
+ return conf.getBoolean(ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT);
+ }
+}
diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml
index f0417cee3..8431d6867 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -40,7 +40,9 @@
<module>kafka</module>
<module>jdbc</module>
<module>pulsar</module>
+
<module>iceberg</module>
+ <module>iceberg-dlc</module>
<module>hbase</module>
<module>postgres-cdc</module>
<module>mongodb-cdc</module>
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java
new file mode 100644
index 000000000..5857fc70c
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.constant.DLCConstant;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * test DLC search sql parse
+ */
+public class DLCIcebergSqlParseTest {
+ private MySqlExtractNode buildMySQLExtractNode(String id) {
+ List<FieldInfo> fields = Arrays.asList(
+ new FieldInfo("id", new IntFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()));
+ // if you hope hive load mode of append, please add this config
+ Map<String, String> map = new HashMap<>();
+ return new MySqlExtractNode(id,
+ "mysql_input",
+ fields,
+ null,
+ map,
+ "id",
+ Collections.singletonList("test"),
+ "localhost",
+ "root",
+ "123456",
+ "inlong",
+ 3306,
+ null,
+ null,
+ null);
+ }
+
+ private DLCIcebergLoadNode buildDLCLoadNode() {
+ // set HIVE_CONF_DIR,or set uri and warehouse
+ Map<String, String> properties = new HashMap<>();
+ properties.put(DLCConstant.DLC_REGION, "ap-beijing");
+ properties.put(DLCConstant.DLC_SECRET_ID, "XXXXXXXXXXX");
+ properties.put(DLCConstant.DLC_SECRET_KEY, "XXXXXXXXXXX");
+
+ properties.put(DLCConstant.FS_COS_REGION, "ap-beijing");
+ properties.put(DLCConstant.FS_COS_SECRET_ID, "XXXXXXXXXXX");
+ properties.put(DLCConstant.FS_COS_SECRET_KEY, "XXXXXXXXXXX");
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new IntFormatInfo()),
+ new FieldInfo("id", new IntFormatInfo())),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())));
+ List<FieldInfo> fields = Arrays.asList(
+ new FieldInfo("id", new IntFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()));
+ return new DLCIcebergLoadNode(
+ "iceberg",
+ "iceberg_output",
+ fields,
+ relations,
+ null,
+ null,
+ null,
+ properties,
+ "inlong",
+ "dlc_output",
+ "id",
+ null,
+ "/hive/warehouse");
+ }
+
+ /**
+ * build node relation
+ *
+ * @param inputs extract node
+ * @param outputs load node
+ * @return node relation
+ */
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelation(inputIds, outputIds);
+ }
+
+ /**
+ * Test mysql to DLC
+ *
+ * @throws Exception The exception may throws when execute the case
+ */
+ @Test
+ public void testDLCIceberg() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node inputNode = buildMySQLExtractNode("1");
+ Node outputNode = buildDLCLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(inputNode, outputNode),
+ Arrays.asList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("group_id", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ FlinkSqlParseResult result = (FlinkSqlParseResult) parser.parse();
+ Assert.assertTrue(!result.getLoadSqls().isEmpty() && !result.getCreateTableSqls().isEmpty());
+ }
+}
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 1fda35242..ee1c8d287 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -756,6 +756,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
org.codehaus.jackson:jackson-mapper-asl:1.9.13 - Data Mapper for Jackson (https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-mapper-asl/1.9.13), (The Apache Software License, Version 2.0)
com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.7.8 - Jackson module: JAXB Annotations (https://github.com/FasterXML/jackson-modules-base), (The Apache Software License, Version 2.0)
com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.10.5 - Jackson module: JAXB Annotations (https://github.com/FasterXML/jackson-modules-base), (The Apache Software License, Version 2.0)
+ com.qcloud:cos_api-bundle:5.6.35 - COS SDK for Java - Bundle (The Apache Software License, Version 2.0)
org.codehaus.jackson:jackson-xc:1.9.2 - Xml Compatibility extensions for Jackson (https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-xc/1.9.2), (The Apache Software License, Version 2.0; GNU Lesser General Public License (LGPL), Version 2.1)
org.javassist:javassist:3.24.0-GA - Javassist (https://github.com/jboss-javassist/javassist/tree/rel_3_24_0_ga), (Apache License 2.0; MPL 1.1; LGPL 2.1)
org.mortbay.jetty:jetty:6.1.26 - Jetty Server (http://www.eclipse.org/jetty/jetty-parent/project/modules/jetty), (Apache Software License - Version 2.0; EPL 1.0)
diff --git a/licenses/inlong-sort-connectors/licenses/LICENSE-cos_api-bundle.txt b/licenses/inlong-sort-connectors/licenses/LICENSE-cos_api-bundle.txt
new file mode 100644
index 000000000..75b52484e
--- /dev/null
+++ b/licenses/inlong-sort-connectors/licenses/LICENSE-cos_api-bundle.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/licenses/inlong-sort-connectors/notices/NOTICE-cos_api-bundle.txt b/licenses/inlong-sort-connectors/notices/NOTICE-cos_api-bundle.txt
new file mode 100644
index 000000000..7b2ab9c06
--- /dev/null
+++ b/licenses/inlong-sort-connectors/notices/NOTICE-cos_api-bundle.txt
@@ -0,0 +1,8 @@
+
+Apache HttpClient
+Copyright 1999-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
diff --git a/pom.xml b/pom.xml
index 684f7d884..f4f87f42d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -238,6 +238,10 @@
<jcommander.version>1.78</jcommander.version>
<je.version>7.3.7</je.version>
<tencentcloud.cls.version>1.0.5</tencentcloud.cls.version>
+ <tencentcloud.api.version>3.1.439</tencentcloud.api.version>
+ <cos.hadoop.version>2.7.5-5.9.3</cos.hadoop.version>
+ <cos.bundle.version>5.6.35</cos.bundle.version>
+ <dlc.client.version>1.0</dlc.client.version>
<esri-geometry-api.version>2.0.0</esri-geometry-api.version>
<HikariCP.version>4.0.3</HikariCP.version>
@@ -1011,6 +1015,11 @@
<artifactId>iceberg-flink-runtime-1.13</artifactId>
<version>${iceberg.flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-flink-runtime-1.13</artifactId>
+ <version>${iceberg.flink.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -1498,6 +1507,26 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.tencentcloudapi</groupId>
+ <artifactId>tencentcloud-sdk-java</artifactId>
+ <version>${tencentcloud.api.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.qcloud.cos</groupId>
+ <artifactId>hadoop-cos</artifactId>
+ <version>${cos.hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.qcloud</groupId>
+ <artifactId>cos_api-bundle</artifactId>
+ <version>${cos.bundle.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.qcloud</groupId>
+ <artifactId>dlc-data-catalog-metastore-client</artifactId>
+ <version>${dlc.client.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>