You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/09/08 07:59:23 UTC
[inlong] 02/02: [INLONG-5193][Sort] Add dlc small file compact feture and adapt newest auth method (#5243)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 31ea1afd9e729a0664f1b652680208d32680fe78
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Tue Sep 6 19:15:57 2022 +0800
[INLONG-5193][Sort] Add dlc small file compact feture and adapt newest auth method (#5243)
---
.../inlong/sort/protocol/constant/DLCConstant.java | 33 +-
.../protocol/node/load/DLCIcebergLoadNode.java | 21 +-
.../protocol/node/load/DLCIcebergLoadNodeTest.java | 5 +-
.../inlong/sort/base/metric/MetricOption.java | 89 ++
.../inlong/sort/base/metric/SinkMetricData.java | 9 +-
.../inlong/sort/base/metric/SourceMetricData.java | 26 +-
inlong-sort/sort-connectors/iceberg-dlc/pom.xml | 26 +-
.../catalog/hybris/DlcWrappedHybrisCatalog.java | 45 +-
.../sort/iceberg/flink/CompactTableProperties.java | 98 ++
.../inlong/sort/iceberg/flink}/FlinkCatalog.java | 1463 ++++++++++----------
.../sort/iceberg/flink}/FlinkCatalogFactory.java | 333 ++---
.../iceberg/flink}/FlinkDynamicTableFactory.java | 455 +++---
.../sort/iceberg/flink}/IcebergTableSink.java | 47 +-
.../sort/iceberg/flink/actions/RewriteResult.java | 32 +
.../flink/actions/SyncRewriteDataFilesAction.java | 133 ++
.../actions/SyncRewriteDataFilesActionOption.java | 171 +++
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 122 ++
.../sort/iceberg/flink}/sink/DeltaManifests.java | 5 +-
.../flink}/sink/DeltaManifestsSerializer.java | 5 +-
.../iceberg/flink}/sink/FlinkManifestUtil.java | 12 +-
.../inlong/sort/iceberg/flink}/sink/FlinkSink.java | 1034 +++++++-------
.../iceberg/flink}/sink/IcebergFilesCommitter.java | 91 +-
.../iceberg/flink}/sink/IcebergStreamWriter.java | 33 +-
.../flink}/sink/ManifestOutputFileFactory.java | 13 +-
.../iceberg/flink}/sink/PartitionKeySelector.java | 4 +-
.../iceberg/flink/sink/PartitionedDeltaWriter.java | 91 ++
.../flink/sink/RowDataTaskWriterFactory.java | 140 ++
.../flink/sink/UnpartitionedDeltaWriter.java | 64 +
.../org.apache.flink.table.factories.Factory | 33 +
.../apache/inlong/sort/iceberg/FlinkCatalog.java | 2 +
.../inlong/sort/iceberg/FlinkCatalogFactory.java | 2 +
.../sort/iceberg/FlinkDynamicTableFactory.java | 2 +-
.../inlong/sort/iceberg/IcebergTableSink.java | 9 +
.../sort/iceberg/sink/BaseDeltaTaskWriter.java | 122 ++
.../inlong/sort/iceberg/sink/DeltaManifests.java | 3 +
.../iceberg/sink/DeltaManifestsSerializer.java | 3 +
.../sort/iceberg/sink/FlinkManifestUtil.java | 3 +
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 28 +-
.../sort/iceberg/sink/IcebergFilesCommitter.java | 3 +
.../sort/iceberg/sink/IcebergStreamWriter.java | 3 +
.../iceberg/sink/ManifestOutputFileFactory.java | 3 +
.../sort/iceberg/sink/PartitionKeySelector.java | 2 +
.../sort/iceberg/sink/PartitionedDeltaWriter.java | 91 ++
.../iceberg/sink/RowDataTaskWriterFactory.java | 140 ++
.../iceberg/sink/UnpartitionedDeltaWriter.java | 64 +
inlong-sort/sort-core/pom.xml | 10 +-
.../inlong/sort/parser/DLCIcebergSqlParseTest.java | 10 +-
licenses/inlong-manager/LICENSE | 10 +-
licenses/inlong-sort-connectors/LICENSE | 53 +-
pom.xml | 19 +-
50 files changed, 3437 insertions(+), 1778 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java
index 1788cd5ba..e4ef7d310 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DLCConstant.java
@@ -27,36 +27,45 @@ public class DLCConstant {
* DLC internet access domain name.
*/
public static final String DLC_ENDPOINT = "dlc.tencentcloudapi.com";
-
+ // ============================== DLC AUTH PARAMS(Required) =====================================
/**
* dlc account region
*/
- public static final String DLC_REGION = "qcloud.dlc.region";
+ public static final String DLC_REGION = "qcloud.dlc.region";
/**
* dlc account secret id
*/
- public static final String DLC_SECRET_ID = "qcloud.dlc.secret-id";
+ public static final String DLC_SECRET_ID = "qcloud.dlc.secret-id";
/**
* dlc account secret key
*/
- public static final String DLC_SECRET_KEY = "qcloud.dlc.secret-key";
-
+ public static final String DLC_SECRET_KEY = "qcloud.dlc.secret-key";
/**
- * dlc cos region
+ * Current user appid.
*/
- public static final String FS_COS_REGION = "fs.cosn.userinfo.region";
+ public static final String DLC_USER_APPID = "qcloud.dlc.user.appid";
/**
- * dlc main account cos secret id
+ * Managed account uid.
*/
- public static final String FS_COS_SECRET_ID = "fs.cosn.userinfo.secretId";
+ public static final String DLC_MANAGED_ACCOUNT_UID = "qcloud.dlc.managed.account.uid";
/**
- * dlc main account cos secret key
+ * dlc jdbc url(optional)
*/
- public static final String FS_COS_SECRET_KEY = "fs.cosn.userinfo.secretKey";
+ public static final String DLC_JDBC_URL = "qcloud.dlc.jdbc.url";
+ // ============================== FS CREDENTIALS AUTH PARAMS =====================================
public static final String FS_LAKEFS_IMPL = "fs.lakefs.impl";
public static final String FS_COS_IMPL = "fs.cosn.impl";
public static final String FS_COS_AUTH_PROVIDER = "fs.cosn.credentials.provider";
+ public static final String FS_COS_REGION = "fs.cosn.userinfo.region";
+ public static final String FS_COS_SECRET_ID = "fs.cosn.userinfo.secretId";
+ public static final String FS_COS_SECRET_KEY = "fs.cosn.userinfo.secretKey";
+
+ public static final String FS_AUTH_DLC_SECRET_ID = "service.secret.id";
+ public static final String FS_AUTH_DLC_SECRET_KEY = "service.secret.key";
+ public static final String FS_AUTH_DLC_REGION = "service.region";
+ public static final String FS_AUTH_DLC_ACCOUNT_APPID = "user.appid";
+ public static final String FS_AUTH_DLC_MANAGED_ACCOUNT_UID = "request.identity.token";
public static final String DLC_CATALOG_IMPL_CLASS =
"org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog";
@@ -65,7 +74,7 @@ public class DLCConstant {
{
put(FS_LAKEFS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
put(FS_COS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
- put(FS_COS_AUTH_PROVIDER, "org.apache.hadoop.fs.auth.SimpleCredentialProvider");
+ put(FS_COS_AUTH_PROVIDER, "org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider");
}
});
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
index 5e0f1dc67..dcb55426a 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNode.java
@@ -91,9 +91,7 @@ public class DLCIcebergLoadNode extends LoadNode implements InlongMetric, Serial
@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
- options.put("connector", "iceberg-inlong");
- // for test sink.ignore.changelog
- // options.put("sink.ignore.changelog", "true");
+ options.put("connector", "dlc-inlong");
options.put("catalog-database", dbName);
options.put("catalog-table", tableName);
options.put("default-database", dbName);
@@ -106,6 +104,16 @@ public class DLCIcebergLoadNode extends LoadNode implements InlongMetric, Serial
options.put("warehouse", warehouse);
}
options.putAll(DLCConstant.DLC_DEFAULT_IMPL);
+ // for filesystem auth
+ options.put(DLCConstant.FS_COS_REGION, options.get(DLCConstant.DLC_REGION));
+ options.put(DLCConstant.FS_COS_SECRET_ID, options.get(DLCConstant.DLC_SECRET_ID));
+ options.put(DLCConstant.FS_COS_SECRET_KEY, options.get(DLCConstant.DLC_SECRET_KEY));
+
+ options.put(DLCConstant.FS_AUTH_DLC_SECRET_ID, options.get(DLCConstant.DLC_SECRET_ID));
+ options.put(DLCConstant.FS_AUTH_DLC_SECRET_KEY, options.get(DLCConstant.DLC_SECRET_KEY));
+ options.put(DLCConstant.FS_AUTH_DLC_REGION, options.get(DLCConstant.DLC_REGION));
+ options.put(DLCConstant.FS_AUTH_DLC_ACCOUNT_APPID, options.get(DLCConstant.DLC_USER_APPID));
+ options.put(DLCConstant.FS_AUTH_DLC_MANAGED_ACCOUNT_UID, options.get(DLCConstant.DLC_MANAGED_ACCOUNT_UID));
return options;
}
@@ -129,9 +137,8 @@ public class DLCIcebergLoadNode extends LoadNode implements InlongMetric, Serial
Preconditions.checkNotNull(properties.get(DLCConstant.DLC_SECRET_ID), "dlc secret-id is null");
Preconditions.checkNotNull(properties.get(DLCConstant.DLC_SECRET_KEY), "dlc secret-key is null");
Preconditions.checkNotNull(properties.get(DLCConstant.DLC_REGION), "dlc region is null");
-
- Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_REGION), "cos region is null");
- Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_SECRET_ID), "cos secret-id is null");
- Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_SECRET_KEY), "cos secret-key is null");
+ Preconditions.checkNotNull(properties.get(DLCConstant.DLC_USER_APPID), "dlc user appid is null");
+ Preconditions.checkNotNull(
+ properties.get(DLCConstant.DLC_MANAGED_ACCOUNT_UID), "dlc managed account appid is null");
}
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java
index 0f690b75a..a12ab5f0a 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DLCIcebergLoadNodeTest.java
@@ -38,10 +38,9 @@ public class DLCIcebergLoadNodeTest extends SerializeBaseTest<DLCIcebergLoadNode
properties.put(DLCConstant.DLC_REGION, "ap-beijing");
properties.put(DLCConstant.DLC_SECRET_ID, "XXXXXXXXXXX");
properties.put(DLCConstant.DLC_SECRET_KEY, "XXXXXXXXXXX");
+ properties.put(DLCConstant.DLC_USER_APPID, "XXXXXXXXXXX");
+ properties.put(DLCConstant.DLC_MANAGED_ACCOUNT_UID, "XXXXXXXXXXX");
- properties.put(DLCConstant.FS_COS_REGION, "ap-beijing");
- properties.put(DLCConstant.FS_COS_SECRET_ID, "XXXXXXXXXXX");
- properties.put(DLCConstant.FS_COS_SECRET_KEY, "XXXXXXXXXXX");
return new DLCIcebergLoadNode(
"iceberg_dlc",
"iceberg_dlc_output",
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
new file mode 100644
index 000000000..d2179ae54
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
+
+import javax.annotation.Nullable;
+import java.util.HashSet;
+import java.util.regex.Pattern;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
+public class MetricOption {
+ private static final String IP_OR_HOST_PORT = "^(.*):([0-9]|[1-9]\\d|[1-9]\\d{"
+ + "2}|[1-9]\\d{"
+ + "3}|[1-5]\\d{"
+ + "4}|6[0-4]\\d{"
+ + "3}|65[0-4]\\d{"
+ + "2}|655[0-2]\\d|6553[0-5])$";
+
+ private final String groupId;
+ private final String streamId;
+ private final String nodeId;
+ private final HashSet<String> ipPortList;
+ private String ipPorts;
+
+ public MetricOption(String inLongMetric) {
+ this(inLongMetric, null);
+ }
+
+ public MetricOption(String inLongMetric, @Nullable String inLongAudit) {
+ ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inLongMetric, inLongAudit);
+ String[] inLongMetricArray = inLongMetric.split(DELIMITER);
+ Preconditions.checkArgument(inLongMetricArray.length == 3,
+ "Error inLong metric format: " + inLongMetric);
+ this.groupId = inLongMetricArray[0];
+ this.streamId = inLongMetricArray[1];
+ this.nodeId = inLongMetricArray[2];
+ this.ipPortList = new HashSet<>();
+ this.ipPorts = null;
+
+ if (inLongAudit != null) {
+ String[] ipPortStrs = inLongAudit.split(DELIMITER);
+ this.ipPorts = inLongAudit;
+ for (String ipPort : ipPortStrs) {
+ Preconditions.checkArgument(Pattern.matches(IP_OR_HOST_PORT, ipPort),
+ "Error inLong audit format: " + inLongAudit);
+ this.ipPortList.add(ipPort);
+ }
+ }
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public HashSet<String> getIpPortList() {
+ return ipPortList;
+ }
+
+ public String getIpPorts() {
+ return ipPorts;
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 0daff99a9..67b47657e 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -56,10 +56,11 @@ public class SinkMetricData implements MetricData {
private Meter numBytesOutPerSecond;
public SinkMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
- this.metricGroup = metricGroup;
- this.groupId = groupId;
- this.streamId = streamId;
- this.nodeId = nodeId;
+ this(groupId, streamId, nodeId, metricGroup, null);
+ }
+
+ public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
+ this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
}
public SinkMetricData(
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index bd82cc041..d97efc9f5 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -25,6 +25,12 @@ import org.apache.flink.metrics.SimpleCounter;
import org.apache.inlong.audit.AuditImp;
import org.apache.inlong.sort.base.Constants;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
@@ -46,7 +52,11 @@ public class SourceMetricData implements MetricData {
private final AuditImp auditImp;
public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup) {
- this(groupId, streamId, nodeId, metricGroup, null);
+ this(groupId, streamId, nodeId, metricGroup, (AuditImp) null);
+ }
+
+ public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
+ this(option.getGroupId(), option.getStreamId(), option.getNodeId(), metricGroup, option.getIpPorts());
}
public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup,
@@ -58,6 +68,20 @@ public class SourceMetricData implements MetricData {
this.auditImp = auditImp;
}
+ public SourceMetricData(String groupId, String streamId, String nodeId, MetricGroup metricGroup,
+ @Nullable String auditHostAndPorts) {
+ this.groupId = groupId;
+ this.streamId = streamId;
+ this.nodeId = nodeId;
+ this.metricGroup = metricGroup;
+ if (auditHostAndPorts != null) {
+ AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+ this.auditImp = AuditImp.getInstance();
+ } else {
+ this.auditImp = null;
+ }
+ }
+
/**
* Default counter is {@link SimpleCounter}
* groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/pom.xml b/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
index be760589e..86d0752c3 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
+++ b/inlong-sort/sort-connectors/iceberg-dlc/pom.xml
@@ -36,6 +36,20 @@
</description>
<dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.qcloud.cos</groupId>
+ <artifactId>lakefs-cloud-plugin</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.qcloud</groupId>
+ <artifactId>dlc-jdbc</artifactId>
+ </dependency>
+
<dependency>
<groupId>com.qcloud.cos</groupId>
<artifactId>hadoop-cos</artifactId>
@@ -46,11 +60,12 @@
</dependency>
<dependency>
<groupId>com.tencentcloudapi</groupId>
- <artifactId>tencentcloud-sdk-java</artifactId>
+ <artifactId>tencentcloud-sdk-java</artifactId><!--TODO:replaced by 558-SNAPSHOT with dlc special treated-->
</dependency>
<dependency>
<groupId>com.qcloud</groupId>
<artifactId>dlc-data-catalog-metastore-client</artifactId>
+ <version>1.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
@@ -86,11 +101,9 @@
<groupId>org.ini4j</groupId>
<artifactId>ini4j</artifactId>
</dependency>
-
<dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>sort-connector-iceberg</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-flink-runtime-1.13</artifactId>
</dependency>
<dependency>
@@ -141,12 +154,15 @@
<include>com.qcloud.cos:*</include>
<include>com.qcloud:*</include>
<include>com.tencentcloudapi:*</include>
+ <include>com.tencent.cloud.dlc:*</include>
<include>commons-logging:commons-logging</include>
+ <include>commons-codec:commons-codec</include>
<include>com.squareup.okio:okio</include>
<include>com.squareup.okhttp:okhttp</include>
<include>com.squareup.okhttp:logging-interceptor</include>
<include>org.ini4j:ini4j</include>
<include>com.google.code.gson:gson</include>
+ <include>javax.xml.bind:jaxb-api</include>
</includes>
</artifactSet>
<filters>
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java
index a8f64db52..c303e21e2 100644
--- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/catalog/hybris/DlcWrappedHybrisCatalog.java
@@ -23,11 +23,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import com.qcloud.dlc.common.Constants;
import com.qcloud.dlc.metastore.DLCDataCatalogMetastoreClient;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CosNConfigKeys;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -71,10 +75,25 @@ import org.slf4j.LoggerFactory;
public class DlcWrappedHybrisCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
public static final String LIST_ALL_TABLES = "list-all-tables";
public static final String LIST_ALL_TABLES_DEFAULT = "false";
-
- // dlc auth
- public static final String DLC_ENDPOINT = "qcloud.dlc.endpoint";
-
+ public static final Set<String> DLC_WHITELIST_PARAMS = Stream.of(
+ Constants.DLC_REGION_CONF,
+ Constants.DLC_ENDPOINT,
+ Constants.DLC_REGION_CONF,
+ Constants.DLC_SECRET_ID_CONF,
+ Constants.DLC_SECRET_KEY_CONF,
+ DlcCloudCredentialsProvider.END_POINT,
+ DlcCloudCredentialsProvider.SECRET_ID,
+ DlcCloudCredentialsProvider.SECRET_KEY,
+ DlcCloudCredentialsProvider.REGION,
+ DlcCloudCredentialsProvider.USER_APPID,
+ DlcCloudCredentialsProvider.REQUEST_IDENTITY_TOKEN,
+ CosNConfigKeys.COSN_USERINFO_SECRET_ID_KEY,
+ CosNConfigKeys.COSN_USERINFO_SECRET_KEY_KEY,
+ CosNConfigKeys.COSN_REGION_PREV_KEY,
+ CosNConfigKeys.COSN_CREDENTIALS_PROVIDER,
+ "fs.lakefs.impl",
+ "fs.cosn.impl"
+ ).collect(Collectors.toSet());
private static final Logger LOG = LoggerFactory.getLogger(DlcWrappedHybrisCatalog.class);
@@ -95,8 +114,14 @@ public class DlcWrappedHybrisCatalog extends BaseMetastoreCatalog implements Sup
this.conf = new Configuration();
}
+ // dlc auth
+ properties.entrySet().stream()
+ .filter(entry -> DLC_WHITELIST_PARAMS.contains(entry.getKey()))
+ .forEach(entry -> this.conf.set(entry.getKey(), entry.getValue()));
+
if (properties.containsKey(CatalogProperties.URI)) {
- this.conf.set(DLC_ENDPOINT, properties.get(CatalogProperties.URI));
+ this.conf.set(Constants.DLC_ENDPOINT, properties.get(CatalogProperties.URI));
+ this.conf.set(DlcCloudCredentialsProvider.END_POINT, properties.get(CatalogProperties.URI));
}
if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
@@ -104,16 +129,6 @@ public class DlcWrappedHybrisCatalog extends BaseMetastoreCatalog implements Sup
properties.get(CatalogProperties.WAREHOUSE_LOCATION));
}
- // dlc auth
- properties.entrySet().stream()
- .filter(entry -> entry.getKey().startsWith("qcloud.dlc"))
- .forEach(entry -> this.conf.set(entry.getKey(), entry.getValue()));
-
- // lakefs auth
- properties.entrySet().stream()
- .filter(entry -> entry.getKey().startsWith("fs"))
- .forEach(entry -> this.conf.set(entry.getKey(), entry.getValue()));
-
this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));
String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/CompactTableProperties.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/CompactTableProperties.java
new file mode 100644
index 000000000..08efdc00d
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/CompactTableProperties.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class CompactTableProperties {
+ public static final String COMPACT_PREFIX = "write.compact.";
+
+ public static final String COMPACT_ENABLED = "write.compact.enable";
+ public static final boolean COMPACT_ENABLED_DEFAULT = false;
+
+ public static final String COMPACT_INTERVAL = "write.compact.snapshot.interval";
+ public static final int COMPACT_INTERVAL_DEFAULT = 5;
+
+ public static final String COMPACT_RESOUCE_POOL = "write.compact.resource.name";
+ public static final String COMPACT_RESOUCE_POOL_DEFAULT = "default";
+
+ // Supported by spark rewrite action option
+ public static final String COMPACT_MAX_CONCURRENT_FILE_GROUP_REWRITES
+ = "write.compact.max-concurrent-file-group-rewrites";
+ public static final int COMPACT_MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT = 1;
+
+ public static final String COMPACT_MAX_FILE_GROUP_SIZE_BYTES = "write.compact.max-file-group-size-bytes";
+ public static final long COMPACT_MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+ public static final String COMPACT_PARTIAL_PROGRESS_ENABLED = "write.compact.partial-progress.enabled";
+ public static final boolean COMPACT_PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+ public static final String COMPACT_PARTIAL_PROGRESS_MAX_COMMITS = "write.compact.partial-progress.max-commits";
+ public static final int COMPACT_PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+ public static final String COMPACT_TARGET_FILE_SIZE_BYTES = "write.compact.target-file-size-bytes";
+ public static final int COMPACT_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024; // 512 MB
+
+ public static final String COMPACT_USE_STARTING_SEQUENCE_NUMBER = "write.compact.use-starting-sequence-number";
+ public static final boolean COMPACT_USE_STARTING_SEQUENCE_NUMBER_DEFAULT = true;
+
+ public static final String COMPACT_MIN_INPUT_FILES = "write.compact.min-input-files";
+ public static final int COMPACT_MIN_INPUT_FILES_DEFAULT = 5;
+
+ public static final String COMPACT_DELETE_FILE_THRESHOLD = "write.compact.delete-file-threshold";
+ public static final int COMPACT_DELETE_FILE_THRESHOLD_DEFAULT = Integer.MAX_VALUE;
+
+ public static final String COMPACT_MIN_FILE_SIZE_BYTES = "write.compact.min-file-size-bytes";
+ public static final double COMPACT_MIN_FILE_SIZE_BYTES_DEFAULT = 0.75d * COMPACT_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+ public static final String COMPACT_MAX_FILE_SIZE_BYTES = "write.compact.max-file-size-bytes";
+ public static final double COMPACT_MAX_FILE_SIZE_BYTES_DEFAULT = 1.80d * COMPACT_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+ public static final Set<String> TABLE_AUTO_COMPACT_PROPERTIES = Stream.of(
+ COMPACT_ENABLED,
+ COMPACT_INTERVAL,
+ COMPACT_RESOUCE_POOL,
+ COMPACT_MAX_CONCURRENT_FILE_GROUP_REWRITES,
+ COMPACT_MAX_FILE_GROUP_SIZE_BYTES,
+ COMPACT_PARTIAL_PROGRESS_ENABLED,
+ COMPACT_PARTIAL_PROGRESS_MAX_COMMITS,
+ COMPACT_TARGET_FILE_SIZE_BYTES,
+ COMPACT_USE_STARTING_SEQUENCE_NUMBER,
+ COMPACT_MIN_INPUT_FILES,
+ COMPACT_DELETE_FILE_THRESHOLD,
+ COMPACT_MIN_FILE_SIZE_BYTES,
+ COMPACT_MAX_FILE_SIZE_BYTES
+ ).collect(Collectors.toSet());
+
+ public static final Set<String> ACTION_AUTO_COMPACT_OPTIONS = Stream.of(
+ COMPACT_MAX_CONCURRENT_FILE_GROUP_REWRITES,
+ COMPACT_MAX_FILE_GROUP_SIZE_BYTES,
+ COMPACT_PARTIAL_PROGRESS_ENABLED,
+ COMPACT_PARTIAL_PROGRESS_MAX_COMMITS,
+ COMPACT_TARGET_FILE_SIZE_BYTES,
+ COMPACT_USE_STARTING_SEQUENCE_NUMBER,
+ COMPACT_MIN_INPUT_FILES,
+ COMPACT_DELETE_FILE_THRESHOLD,
+ COMPACT_MIN_FILE_SIZE_BYTES,
+ COMPACT_MAX_FILE_SIZE_BYTES
+ ).collect(Collectors.toSet());
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalog.java
similarity index 95%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalog.java
index 2056a8d02..695bf2ea6 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalog.java
@@ -1,731 +1,732 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.sort.iceberg;
-
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.AbstractCatalog;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
-import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.factories.Factory;
-import org.apache.flink.util.StringUtils;
-import org.apache.iceberg.CachingCatalog;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.Transaction;
-import org.apache.iceberg.UpdateProperties;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.SupportsNamespaces;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.exceptions.AlreadyExistsException;
-import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
-import org.apache.iceberg.exceptions.NoSuchNamespaceException;
-import org.apache.iceberg.flink.CatalogLoader;
-import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}.
- * <p>
- * The mapping between Flink database and Iceberg namespace:
- * Supplying a base namespace for a given catalog, so if you have a catalog that supports a 2-level namespace, you
- * would supply the first level in the catalog configuration and the second level would be exposed as Flink databases.
- * </p>
- * The Iceberg table manages its partitions by itself. The partition of the Iceberg table is independent of the
- * partition of Flink.
- */
-public class FlinkCatalog extends AbstractCatalog {
-
- private final CatalogLoader catalogLoader;
- private final Catalog icebergCatalog;
- private final Namespace baseNamespace;
- private final SupportsNamespaces asNamespaceCatalog;
- private final Closeable closeable;
- private final boolean cacheEnabled;
-
- public FlinkCatalog(
- String catalogName,
- String defaultDatabase,
- Namespace baseNamespace,
- CatalogLoader catalogLoader,
- boolean cacheEnabled) {
- super(catalogName, defaultDatabase);
- this.catalogLoader = catalogLoader;
- this.baseNamespace = baseNamespace;
- this.cacheEnabled = cacheEnabled;
-
- Catalog originalCatalog = catalogLoader.loadCatalog();
- icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
- asNamespaceCatalog =
- originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
- closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
- }
-
- private static void validateFlinkTable(CatalogBaseTable table) {
- Preconditions.checkArgument(table instanceof CatalogTable, "The Table should be a CatalogTable.");
-
- TableSchema schema = table.getSchema();
- schema.getTableColumns().forEach(column -> {
- if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
- throw new UnsupportedOperationException("Creating table with computed columns is not supported yet.");
- }
- });
-
- if (!schema.getWatermarkSpecs().isEmpty()) {
- throw new UnsupportedOperationException("Creating table with watermark specs is not supported yet.");
- }
- }
-
- private static PartitionSpec toPartitionSpec(List<String> partitionKeys, Schema icebergSchema) {
- PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
- partitionKeys.forEach(builder::identity);
- return builder.build();
- }
-
- private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSchema) {
- List<String> partitionKeys = Lists.newArrayList();
- for (PartitionField field : spec.fields()) {
- if (field.transform().isIdentity()) {
- partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
- } else {
- // Not created by Flink SQL.
- // For compatibility with iceberg tables, return empty.
- // TODO modify this after Flink support partition transform.
- return Collections.emptyList();
- }
- }
- return partitionKeys;
- }
-
- private static void commitChanges(Table table, String setLocation, String setSnapshotId,
- String pickSnapshotId, Map<String, String> setProperties) {
- // don't allow setting the snapshot and picking a commit
- // at the same time because order is ambiguous and choosing
- // one order leads to different results
- Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId == null,
- "Cannot set the current snapshot ID and cherry-pick snapshot changes");
-
- if (setSnapshotId != null) {
- long newSnapshotId = Long.parseLong(setSnapshotId);
- table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
- }
-
- // if updating the table snapshot, perform that update first in case it fails
- if (pickSnapshotId != null) {
- long newSnapshotId = Long.parseLong(pickSnapshotId);
- table.manageSnapshots().cherrypick(newSnapshotId).commit();
- }
-
- Transaction transaction = table.newTransaction();
-
- if (setLocation != null) {
- transaction.updateLocation()
- .setLocation(setLocation)
- .commit();
- }
-
- if (!setProperties.isEmpty()) {
- UpdateProperties updateProperties = transaction.updateProperties();
- setProperties.forEach((k, v) -> {
- if (v == null) {
- updateProperties.remove(k);
- } else {
- updateProperties.set(k, v);
- }
- });
- updateProperties.commit();
- }
-
- transaction.commitTransaction();
- }
-
- static CatalogTable toCatalogTable(Table table) {
- TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
- List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
-
- // NOTE: We can not create a IcebergCatalogTable extends CatalogTable, because Flink optimizer may use
- // CatalogTableImpl to copy a new catalog table.
- // Let's re-loading table from Iceberg catalog when creating source/sink operators.
- // Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
- return new CatalogTableImpl(schema, partitionKeys, table.properties(), null);
- }
-
- @Override
- public void open() throws CatalogException {
- // Create the default database if it does not exist.
- try {
- createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
- } catch (DatabaseAlreadyExistException e) {
- // Ignore the exception if it's already exist.
- }
- }
-
- @Override
- public void close() throws CatalogException {
- if (closeable != null) {
- try {
- closeable.close();
- } catch (IOException e) {
- throw new CatalogException(e);
- }
- }
- }
-
- public Catalog catalog() {
- return icebergCatalog;
- }
-
- private Namespace toNamespace(String database) {
- String[] namespace = new String[baseNamespace.levels().length + 1];
- System.arraycopy(baseNamespace.levels(), 0, namespace, 0, baseNamespace.levels().length);
- namespace[baseNamespace.levels().length] = database;
- return Namespace.of(namespace);
- }
-
- TableIdentifier toIdentifier(ObjectPath path) {
- return TableIdentifier.of(toNamespace(path.getDatabaseName()), path.getObjectName());
- }
-
- @Override
- public List<String> listDatabases() throws CatalogException {
- if (asNamespaceCatalog == null) {
- return Collections.singletonList(getDefaultDatabase());
- }
-
- return asNamespaceCatalog.listNamespaces(baseNamespace).stream()
- .map(n -> n.level(n.levels().length - 1))
- .collect(Collectors.toList());
- }
-
- @Override
- public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
- if (asNamespaceCatalog == null) {
- if (!getDefaultDatabase().equals(databaseName)) {
- throw new DatabaseNotExistException(getName(), databaseName);
- } else {
- return new CatalogDatabaseImpl(Maps.newHashMap(), "");
- }
- } else {
- try {
- Map<String, String> metadata =
- Maps.newHashMap(asNamespaceCatalog.loadNamespaceMetadata(toNamespace(databaseName)));
- String comment = metadata.remove("comment");
- return new CatalogDatabaseImpl(metadata, comment);
- } catch (NoSuchNamespaceException e) {
- throw new DatabaseNotExistException(getName(), databaseName, e);
- }
- }
- }
-
- @Override
- public boolean databaseExists(String databaseName) throws CatalogException {
- try {
- getDatabase(databaseName);
- return true;
- } catch (DatabaseNotExistException ignore) {
- return false;
- }
- }
-
- @Override
- public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
- throws DatabaseAlreadyExistException, CatalogException {
- createDatabase(name, mergeComment(database.getProperties(), database.getComment()), ignoreIfExists);
- }
-
- private void createDatabase(String databaseName, Map<String, String> metadata, boolean ignoreIfExists)
- throws DatabaseAlreadyExistException, CatalogException {
- if (asNamespaceCatalog != null) {
- try {
- asNamespaceCatalog.createNamespace(toNamespace(databaseName), metadata);
- } catch (AlreadyExistsException e) {
- if (!ignoreIfExists) {
- throw new DatabaseAlreadyExistException(getName(), databaseName, e);
- }
- }
- } else {
- throw new UnsupportedOperationException("Namespaces are not supported by catalog: " + getName());
- }
- }
-
- private Map<String, String> mergeComment(Map<String, String> metadata, String comment) {
- Map<String, String> ret = Maps.newHashMap(metadata);
- if (metadata.containsKey("comment")) {
- throw new CatalogException("Database properties should not contain key: 'comment'.");
- }
-
- if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
- ret.put("comment", comment);
- }
- return ret;
- }
-
- @Override
- public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
- throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
- if (asNamespaceCatalog != null) {
- try {
- boolean success = asNamespaceCatalog.dropNamespace(toNamespace(name));
- if (!success && !ignoreIfNotExists) {
- throw new DatabaseNotExistException(getName(), name);
- }
- } catch (NoSuchNamespaceException e) {
- if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(getName(), name, e);
- }
- } catch (NamespaceNotEmptyException e) {
- throw new DatabaseNotEmptyException(getName(), name, e);
- }
- } else {
- if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(getName(), name);
- }
- }
- }
-
- @Override
- public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
- throws DatabaseNotExistException, CatalogException {
- if (asNamespaceCatalog != null) {
- Namespace namespace = toNamespace(name);
- Map<String, String> updates = Maps.newHashMap();
- Set<String> removals = Sets.newHashSet();
-
- try {
- Map<String, String> oldProperties = asNamespaceCatalog.loadNamespaceMetadata(namespace);
- Map<String, String> newProperties = mergeComment(newDatabase.getProperties(), newDatabase.getComment());
-
- for (String key : oldProperties.keySet()) {
- if (!newProperties.containsKey(key)) {
- removals.add(key);
- }
- }
-
- for (Map.Entry<String, String> entry : newProperties.entrySet()) {
- if (!entry.getValue().equals(oldProperties.get(entry.getKey()))) {
- updates.put(entry.getKey(), entry.getValue());
- }
- }
-
- if (!updates.isEmpty()) {
- asNamespaceCatalog.setProperties(namespace, updates);
- }
-
- if (!removals.isEmpty()) {
- asNamespaceCatalog.removeProperties(namespace, removals);
- }
-
- } catch (NoSuchNamespaceException e) {
- if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(getName(), name, e);
- }
- }
- } else {
- if (getDefaultDatabase().equals(name)) {
- throw new CatalogException(
- "Can not alter the default database when the iceberg catalog doesn't support namespaces.");
- }
- if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(getName(), name);
- }
- }
- }
-
- @Override
- public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
- try {
- return icebergCatalog.listTables(toNamespace(databaseName)).stream()
- .map(TableIdentifier::name)
- .collect(Collectors.toList());
- } catch (NoSuchNamespaceException e) {
- throw new DatabaseNotExistException(getName(), databaseName, e);
- }
- }
-
- @Override
- public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
- Table table = loadIcebergTable(tablePath);
- return toCatalogTable(table);
- }
-
- private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
- try {
- Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
- if (cacheEnabled) {
- table.refresh();
- }
-
- return table;
- } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
- throw new TableNotExistException(getName(), tablePath, e);
- }
- }
-
- @Override
- public boolean tableExists(ObjectPath tablePath) throws CatalogException {
- return icebergCatalog.tableExists(toIdentifier(tablePath));
- }
-
- @Override
- public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
- try {
- icebergCatalog.dropTable(toIdentifier(tablePath));
- } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
- if (!ignoreIfNotExists) {
- throw new TableNotExistException(getName(), tablePath, e);
- }
- }
- }
-
- @Override
- public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
- throws TableNotExistException, TableAlreadyExistException, CatalogException {
- try {
- icebergCatalog.renameTable(
- toIdentifier(tablePath),
- toIdentifier(new ObjectPath(tablePath.getDatabaseName(), newTableName)));
- } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
- if (!ignoreIfNotExists) {
- throw new TableNotExistException(getName(), tablePath, e);
- }
- } catch (AlreadyExistsException e) {
- throw new TableAlreadyExistException(getName(), tablePath, e);
- }
- }
-
- @Override
- public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
- throws CatalogException, TableAlreadyExistException {
- if (Objects.equals(table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
- throw new IllegalArgumentException("Cannot create the table with 'connector'='iceberg' table property in "
- + "an iceberg catalog, Please create table with 'connector'='iceberg' "
- + "property in a non-iceberg catalog "
- + "or create table without 'connector'='iceberg' related properties in an iceberg table.");
- }
-
- createIcebergTable(tablePath, table, ignoreIfExists);
- }
-
- void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
- throws CatalogException, TableAlreadyExistException {
- validateFlinkTable(table);
-
- Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
- PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
-
- ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
- String location = null;
- for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
- if ("location".equalsIgnoreCase(entry.getKey())) {
- location = entry.getValue();
- } else {
- properties.put(entry.getKey(), entry.getValue());
- }
- }
-
- try {
- icebergCatalog.createTable(
- toIdentifier(tablePath),
- icebergSchema,
- spec,
- location,
- properties.build());
- } catch (AlreadyExistsException e) {
- if (!ignoreIfExists) {
- throw new TableAlreadyExistException(getName(), tablePath, e);
- }
- }
- }
-
- @Override
- public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
- throws CatalogException, TableNotExistException {
- validateFlinkTable(newTable);
-
- Table icebergTable;
- try {
- icebergTable = loadIcebergTable(tablePath);
- } catch (TableNotExistException e) {
- if (!ignoreIfNotExists) {
- throw e;
- } else {
- return;
- }
- }
-
- CatalogTable table = toCatalogTable(icebergTable);
-
- // Currently, Flink SQL only support altering table properties.
-
- // For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by comparing
- // CatalogTable instances, unless the Flink schema contains Iceberg column IDs.
- if (!table.getSchema().equals(newTable.getSchema())) {
- throw new UnsupportedOperationException("Altering schema is not supported yet.");
- }
-
- if (!table.getPartitionKeys().equals(((CatalogTable) newTable).getPartitionKeys())) {
- throw new UnsupportedOperationException("Altering partition keys is not supported yet.");
- }
-
- Map<String, String> oldProperties = table.getOptions();
- Map<String, String> setProperties = Maps.newHashMap();
-
- String setLocation = null;
- String setSnapshotId = null;
- String pickSnapshotId = null;
-
- for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
-
- if (Objects.equals(value, oldProperties.get(key))) {
- continue;
- }
-
- if ("location".equalsIgnoreCase(key)) {
- setLocation = value;
- } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
- setSnapshotId = value;
- } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
- pickSnapshotId = value;
- } else {
- setProperties.put(key, value);
- }
- }
-
- oldProperties.keySet().forEach(k -> {
- if (!newTable.getOptions().containsKey(k)) {
- setProperties.put(k, null);
- }
- });
-
- commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
- }
-
- @Override
- public Optional<Factory> getFactory() {
- return Optional.of(new FlinkDynamicTableFactory(this));
- }
-
- CatalogLoader getCatalogLoader() {
- return catalogLoader;
- }
-
- // ------------------------------ Unsupported methods ---------------------------------------------
-
- @Override
- public List<String> listViews(String databaseName) throws CatalogException {
- return Collections.emptyList();
- }
-
- @Override
- public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition,
- boolean ignoreIfExists) throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition,
- boolean ignoreIfNotExists) throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<String> listFunctions(String dbName) throws CatalogException {
- return Collections.emptyList();
- }
-
- @Override
- public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
- throw new FunctionNotExistException(getName(), functionPath);
- }
-
- @Override
- public boolean functionExists(ObjectPath functionPath) throws CatalogException {
- return false;
- }
-
- @Override
- public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
- throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics,
- boolean ignoreIfNotExists) throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics,
- boolean ignoreIfNotExists) throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
- CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
- CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
- throws TableNotExistException, TableNotPartitionedException, CatalogException {
- Table table = loadIcebergTable(tablePath);
-
- if (table.spec().isUnpartitioned()) {
- throw new TableNotPartitionedException(icebergCatalog.name(), tablePath);
- }
-
- Set<CatalogPartitionSpec> set = Sets.newHashSet();
- try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
- for (DataFile dataFile : CloseableIterable.transform(tasks, FileScanTask::file)) {
- Map<String, String> map = Maps.newHashMap();
- StructLike structLike = dataFile.partition();
- PartitionSpec spec = table.specs().get(dataFile.specId());
- for (int i = 0; i < structLike.size(); i++) {
- map.put(spec.fields().get(i).name(), String.valueOf(structLike.get(i, Object.class)));
- }
- set.add(new CatalogPartitionSpec(map));
- }
- } catch (IOException e) {
- throw new CatalogException(String.format("Failed to list partitions of table %s", tablePath), e);
- }
-
- return Lists.newArrayList(set);
- }
-
- @Override
- public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)
- throws CatalogException {
- throw new UnsupportedOperationException();
- }
-
- // After partition pruning and filter push down, the statistics have become very inaccurate, so the statistics from
- // here are of little significance.
- // Flink will support something like SupportsReportStatistics in future.
-
- @Override
- public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
- throws CatalogException {
- return CatalogTableStatistics.UNKNOWN;
- }
-
- @Override
- public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
- throws CatalogException {
- return CatalogColumnStatistics.UNKNOWN;
- }
-
- @Override
- public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
- throws CatalogException {
- return CatalogTableStatistics.UNKNOWN;
- }
-
- @Override
- public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath,
- CatalogPartitionSpec partitionSpec)
- throws CatalogException {
- return CatalogColumnStatistics.UNKNOWN;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.util.StringUtils;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A Flink Catalog implementation that wraps an Iceberg {@link Catalog}.
+ * <p>
+ * The mapping between Flink database and Iceberg namespace:
+ * Supplying a base namespace for a given catalog, so if you have a catalog that supports a 2-level namespace, you
+ * would supply the first level in the catalog configuration and the second level would be exposed as Flink databases.
+ * <p>
+ * The Iceberg table manages its partitions by itself. The partition of the Iceberg table is independent of the
+ * partition of Flink.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+public class FlinkCatalog extends AbstractCatalog {
+
+ private final CatalogLoader catalogLoader;
+ private final Catalog icebergCatalog;
+ private final Namespace baseNamespace;
+ private final SupportsNamespaces asNamespaceCatalog;
+ private final Closeable closeable;
+ private final boolean cacheEnabled;
+
+ public FlinkCatalog(
+ String catalogName,
+ String defaultDatabase,
+ Namespace baseNamespace,
+ CatalogLoader catalogLoader,
+ boolean cacheEnabled) {
+ super(catalogName, defaultDatabase);
+ this.catalogLoader = catalogLoader;
+ this.baseNamespace = baseNamespace;
+ this.cacheEnabled = cacheEnabled;
+
+ Catalog originalCatalog = catalogLoader.loadCatalog();
+ icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
+ asNamespaceCatalog = originalCatalog instanceof SupportsNamespaces
+ ? (SupportsNamespaces) originalCatalog : null;
+ closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ // Create the default database if it does not exist.
+ try {
+ createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
+ } catch (DatabaseAlreadyExistException e) {
+ // Ignore the exception if it's already exist.
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (IOException e) {
+ throw new CatalogException(e);
+ }
+ }
+ }
+
+ public Catalog catalog() {
+ return icebergCatalog;
+ }
+
+ private Namespace toNamespace(String database) {
+ String[] namespace = new String[baseNamespace.levels().length + 1];
+ System.arraycopy(baseNamespace.levels(), 0, namespace, 0, baseNamespace.levels().length);
+ namespace[baseNamespace.levels().length] = database;
+ return Namespace.of(namespace);
+ }
+
+ TableIdentifier toIdentifier(ObjectPath path) {
+ return TableIdentifier.of(toNamespace(path.getDatabaseName()), path.getObjectName());
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ if (asNamespaceCatalog == null) {
+ return Collections.singletonList(getDefaultDatabase());
+ }
+
+ return asNamespaceCatalog.listNamespaces(baseNamespace).stream()
+ .map(n -> n.level(n.levels().length - 1))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
+ if (asNamespaceCatalog == null) {
+ if (!getDefaultDatabase().equals(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ } else {
+ return new CatalogDatabaseImpl(Maps.newHashMap(), "");
+ }
+ } else {
+ try {
+ Map<String, String> metadata =
+ Maps.newHashMap(asNamespaceCatalog.loadNamespaceMetadata(toNamespace(databaseName)));
+ String comment = metadata.remove("comment");
+ return new CatalogDatabaseImpl(metadata, comment);
+ } catch (NoSuchNamespaceException e) {
+ throw new DatabaseNotExistException(getName(), databaseName, e);
+ }
+ }
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ try {
+ getDatabase(databaseName);
+ return true;
+ } catch (DatabaseNotExistException ignore) {
+ return false;
+ }
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ createDatabase(name, mergeComment(database.getProperties(), database.getComment()), ignoreIfExists);
+ }
+
+ private void createDatabase(String databaseName, Map<String, String> metadata, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ if (asNamespaceCatalog != null) {
+ try {
+ asNamespaceCatalog.createNamespace(toNamespace(databaseName), metadata);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new DatabaseAlreadyExistException(getName(), databaseName, e);
+ }
+ }
+ } else {
+ throw new UnsupportedOperationException("Namespaces are not supported by catalog: " + getName());
+ }
+ }
+
+ private Map<String, String> mergeComment(Map<String, String> metadata, String comment) {
+ Map<String, String> ret = Maps.newHashMap(metadata);
+ if (metadata.containsKey("comment")) {
+ throw new CatalogException("Database properties should not contain key: 'comment'.");
+ }
+
+ if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+ ret.put("comment", comment);
+ }
+ return ret;
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+ if (asNamespaceCatalog != null) {
+ try {
+ boolean success = asNamespaceCatalog.dropNamespace(toNamespace(name));
+ if (!success && !ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ } catch (NoSuchNamespaceException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name, e);
+ }
+ } catch (NamespaceNotEmptyException e) {
+ throw new DatabaseNotEmptyException(getName(), name, e);
+ }
+ } else {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ }
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ if (asNamespaceCatalog != null) {
+ Namespace namespace = toNamespace(name);
+ Map<String, String> updates = Maps.newHashMap();
+ Set<String> removals = Sets.newHashSet();
+
+ try {
+ Map<String, String> oldProperties = asNamespaceCatalog.loadNamespaceMetadata(namespace);
+ Map<String, String> newProperties = mergeComment(newDatabase.getProperties(), newDatabase.getComment());
+
+ for (String key : oldProperties.keySet()) {
+ if (!newProperties.containsKey(key)) {
+ removals.add(key);
+ }
+ }
+
+ for (Map.Entry<String, String> entry : newProperties.entrySet()) {
+ if (!entry.getValue().equals(oldProperties.get(entry.getKey()))) {
+ updates.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ if (!updates.isEmpty()) {
+ asNamespaceCatalog.setProperties(namespace, updates);
+ }
+
+ if (!removals.isEmpty()) {
+ asNamespaceCatalog.removeProperties(namespace, removals);
+ }
+
+ } catch (NoSuchNamespaceException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name, e);
+ }
+ }
+ } else {
+ if (getDefaultDatabase().equals(name)) {
+ throw new CatalogException(
+ "Can not alter the default database when the iceberg catalog doesn't support namespaces.");
+ }
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
+ try {
+ return icebergCatalog.listTables(toNamespace(databaseName)).stream()
+ .map(TableIdentifier::name)
+ .collect(Collectors.toList());
+ } catch (NoSuchNamespaceException e) {
+ throw new DatabaseNotExistException(getName(), databaseName, e);
+ }
+ }
+
+ @Override
+ public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ Table table = loadIcebergTable(tablePath);
+ return toCatalogTable(table);
+ }
+
+ private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
+ try {
+ Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
+ if (cacheEnabled) {
+ table.refresh();
+ }
+
+ return table;
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ return icebergCatalog.tableExists(toIdentifier(tablePath));
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ icebergCatalog.dropTable(toIdentifier(tablePath));
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
+ }
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException, CatalogException {
+ try {
+ icebergCatalog.renameTable(
+ toIdentifier(tablePath),
+ toIdentifier(new ObjectPath(tablePath.getDatabaseName(), newTableName)));
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath, e);
+ }
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+ throws CatalogException, TableAlreadyExistException {
+ if (Objects.equals(table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
+ throw new IllegalArgumentException("Cannot create the table with 'connector'='iceberg' table property in "
+ + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg "
+ + "catalog or create table without 'connector'='iceberg' related properties in an iceberg table.");
+ }
+
+ createIcebergTable(tablePath, table, ignoreIfExists);
+ }
+
+ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+ throws CatalogException, TableAlreadyExistException {
+ validateFlinkTable(table);
+
+ Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+ PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
+
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+ String location = null;
+ for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+ if ("location".equalsIgnoreCase(entry.getKey())) {
+ location = entry.getValue();
+ } else {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ try {
+ icebergCatalog.createTable(
+ toIdentifier(tablePath),
+ icebergSchema,
+ spec,
+ location,
+ properties.build());
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
+ }
+ }
+
+ @Override
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+ throws CatalogException, TableNotExistException {
+ validateFlinkTable(newTable);
+
+ Table icebergTable;
+ try {
+ icebergTable = loadIcebergTable(tablePath);
+ } catch (TableNotExistException e) {
+ if (!ignoreIfNotExists) {
+ throw e;
+ } else {
+ return;
+ }
+ }
+
+ CatalogTable table = toCatalogTable(icebergTable);
+
+ // Currently, Flink SQL only support altering table properties.
+
+ // For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by comparing
+ // CatalogTable instances, unless the Flink schema contains Iceberg column IDs.
+ if (!table.getSchema().equals(newTable.getSchema())) {
+ throw new UnsupportedOperationException("Altering schema is not supported yet.");
+ }
+
+ if (!table.getPartitionKeys().equals(((CatalogTable) newTable).getPartitionKeys())) {
+ throw new UnsupportedOperationException("Altering partition keys is not supported yet.");
+ }
+
+ Map<String, String> oldProperties = table.getOptions();
+ Map<String, String> setProperties = Maps.newHashMap();
+
+ String setLocation = null;
+ String setSnapshotId = null;
+ String pickSnapshotId = null;
+
+ for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (Objects.equals(value, oldProperties.get(key))) {
+ continue;
+ }
+
+ if ("location".equalsIgnoreCase(key)) {
+ setLocation = value;
+ } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+ setSnapshotId = value;
+ } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+ pickSnapshotId = value;
+ } else {
+ setProperties.put(key, value);
+ }
+ }
+
+ oldProperties.keySet().forEach(k -> {
+ if (!newTable.getOptions().containsKey(k)) {
+ setProperties.put(k, null);
+ }
+ });
+
+ commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
+ }
+
+ private static void validateFlinkTable(CatalogBaseTable table) {
+ Preconditions.checkArgument(table instanceof CatalogTable, "The Table should be a CatalogTable.");
+
+ TableSchema schema = table.getSchema();
+ schema.getTableColumns().forEach(column -> {
+ if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
+ throw new UnsupportedOperationException("Creating table with computed columns is not supported yet.");
+ }
+ });
+
+ if (!schema.getWatermarkSpecs().isEmpty()) {
+ throw new UnsupportedOperationException("Creating table with watermark specs is not supported yet.");
+ }
+ }
+
+ private static PartitionSpec toPartitionSpec(List<String> partitionKeys, Schema icebergSchema) {
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
+ partitionKeys.forEach(builder::identity);
+ return builder.build();
+ }
+
+ private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSchema) {
+ List<String> partitionKeys = Lists.newArrayList();
+ for (PartitionField field : spec.fields()) {
+ if (field.transform().isIdentity()) {
+ partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
+ } else {
+ // Not created by Flink SQL.
+ // For compatibility with iceberg tables, return empty.
+ // TODO modify this after Flink support partition transform.
+ return Collections.emptyList();
+ }
+ }
+ return partitionKeys;
+ }
+
+ private static void commitChanges(Table table, String setLocation, String setSnapshotId,
+ String pickSnapshotId, Map<String, String> setProperties) {
+ // don't allow setting the snapshot and picking a commit at the same time because order is ambiguous and
+ // choosing one order leads to different results
+ Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId == null,
+ "Cannot set the current snapshot ID and cherry-pick snapshot changes");
+
+ if (setSnapshotId != null) {
+ long newSnapshotId = Long.parseLong(setSnapshotId);
+ table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
+ }
+
+ // if updating the table snapshot, perform that update first in case it fails
+ if (pickSnapshotId != null) {
+ long newSnapshotId = Long.parseLong(pickSnapshotId);
+ table.manageSnapshots().cherrypick(newSnapshotId).commit();
+ }
+
+ Transaction transaction = table.newTransaction();
+
+ if (setLocation != null) {
+ transaction.updateLocation()
+ .setLocation(setLocation)
+ .commit();
+ }
+
+ if (!setProperties.isEmpty()) {
+ UpdateProperties updateProperties = transaction.updateProperties();
+ setProperties.forEach((k, v) -> {
+ if (v == null) {
+ updateProperties.remove(k);
+ } else {
+ updateProperties.set(k, v);
+ }
+ });
+ updateProperties.commit();
+ }
+
+ transaction.commitTransaction();
+ }
+
+ static CatalogTable toCatalogTable(Table table) {
+ TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
+ List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
+
+ // NOTE: We can not create a IcebergCatalogTable extends CatalogTable, because Flink optimizer may use
+ // CatalogTableImpl to copy a new catalog table.
+ // Let's re-loading table from Iceberg catalog when creating source/sink operators.
+ // Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
+ return new CatalogTableImpl(schema, partitionKeys, table.properties(), null);
+ }
+
+ @Override
+ public Optional<Factory> getFactory() {
+ return Optional.of(new FlinkDynamicTableFactory(this));
+ }
+
+ CatalogLoader getCatalogLoader() {
+ return catalogLoader;
+ }
+
+ // ------------------------------ Unsupported methods ---------------------------------------------
+
+ @Override
+ public List<String> listViews(String databaseName) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition,
+ boolean ignoreIfExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition,
+ boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listFunctions(String dbName) throws CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
+ throw new FunctionNotExistException(getName(), functionPath);
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+ return false;
+ }
+
+ @Override
+ public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics,
+ boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
+ CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,
+ CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ Table table = loadIcebergTable(tablePath);
+
+ if (table.spec().isUnpartitioned()) {
+ throw new TableNotPartitionedException(icebergCatalog.name(), tablePath);
+ }
+
+ Set<CatalogPartitionSpec> set = Sets.newHashSet();
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ for (DataFile dataFile : CloseableIterable.transform(tasks, FileScanTask::file)) {
+ Map<String, String> map = Maps.newHashMap();
+ StructLike structLike = dataFile.partition();
+ PartitionSpec spec = table.specs().get(dataFile.specId());
+ for (int i = 0; i < structLike.size(); i++) {
+ map.put(spec.fields().get(i).name(), String.valueOf(structLike.get(i, Object.class)));
+ }
+ set.add(new CatalogPartitionSpec(map));
+ }
+ } catch (IOException e) {
+ throw new CatalogException(String.format("Failed to list partitions of table %s", tablePath), e);
+ }
+
+ return Lists.newArrayList(set);
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // After partition pruning and filter push down, the statistics have become very inaccurate, so the statistics from
+ // here are of little significance.
+ // Flink will support something like SupportsReportStatistics in future.
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+ throws CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
+ throws CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+}
+
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalogFactory.java
similarity index 91%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalogFactory.java
index f4bb7d49f..3ef8405c6 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalogFactory.java
@@ -1,166 +1,167 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.sort.iceberg;
-
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.util.HadoopUtils;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.flink.CatalogLoader;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Strings;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-/**
- * A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
- * <p>
- * This supports the following catalog configuration options:
- * <ul>
- * <li><code>type</code> - Flink catalog factory key, should be "iceberg"</li>
- * <li><code>catalog-type</code> - iceberg catalog type, "hive" or "hadoop"</li>
- * <li><code>uri</code> - the Hive Metastore URI (Hive catalog only)</li>
- * <li><code>clients</code> - the Hive Client Pool Size (Hive catalog only)</li>
- * <li><code>warehouse</code> - the warehouse path (Hadoop catalog only)</li>
- * <li><code>default-database</code> - a database name to use as the default</li>
- * <li><code>base-namespace</code> - a base namespace as the prefix for all databases (Hadoop catalog only)</li>
- * <li><code>cache-enabled</code> - whether to enable catalog cache</li>
- * </ul>
- * </p>
- * To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override
- * {@link #createCatalogLoader(String, Map, Configuration)}.
- */
-public class FlinkCatalogFactory implements CatalogFactory {
-
- // Can not just use "type", it conflicts with CATALOG_TYPE.
- public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
- public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
- public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
-
- public static final String HIVE_CONF_DIR = "hive-conf-dir";
- public static final String DEFAULT_DATABASE = "default-database";
- public static final String DEFAULT_DATABASE_NAME = "default";
- public static final String BASE_NAMESPACE = "base-namespace";
- public static final String CACHE_ENABLED = "cache-enabled";
-
- public static final String TYPE = "type";
- public static final String PROPERTY_VERSION = "property-version";
-
- /**
- * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter.
- *
- * @param name Flink's catalog name
- * @param properties Flink's catalog properties
- * @param hadoopConf Hadoop configuration for catalog
- * @return an Iceberg catalog loader
- */
- static CatalogLoader createCatalogLoader(String name, Map<String, String> properties, Configuration hadoopConf) {
- String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL);
- if (catalogImpl != null) {
- String catalogType = properties.get(ICEBERG_CATALOG_TYPE);
- Preconditions.checkArgument(catalogType == null,
- "Cannot create catalog %s, both catalog-type and "
- + "catalog-impl are set: catalog-type=%s, catalog-impl=%s",
- name, catalogType, catalogImpl);
- return CatalogLoader.custom(name, properties, hadoopConf, catalogImpl);
- }
-
- String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
- switch (catalogType.toLowerCase(Locale.ENGLISH)) {
- case ICEBERG_CATALOG_TYPE_HIVE:
- // The values of properties 'uri', 'warehouse',
- // 'hive-conf-dir' are allowed to be null, in that case it will
- // fallback to parse those values from hadoop configuration which is loaded from classpath.
- String hiveConfDir = properties.get(HIVE_CONF_DIR);
- Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir);
- return CatalogLoader.hive(name, newHadoopConf, properties);
-
- case ICEBERG_CATALOG_TYPE_HADOOP:
- return CatalogLoader.hadoop(name, hadoopConf, properties);
-
- default:
- throw new UnsupportedOperationException("Unknown catalog-type: " + catalogType
- + " (Must be 'hive' or 'hadoop')");
- }
- }
-
- private static Configuration mergeHiveConf(Configuration hadoopConf, String hiveConfDir) {
- Configuration newConf = new Configuration(hadoopConf);
- if (!Strings.isNullOrEmpty(hiveConfDir)) {
- Preconditions.checkState(Files.exists(Paths.get(hiveConfDir, "hive-site.xml")),
- "There should be a hive-site.xml file under the directory %s", hiveConfDir);
- newConf.addResource(new Path(hiveConfDir, "hive-site.xml"));
- } else {
- // If don't provide the hive-site.xml path explicitly, it will try to load resource from classpath. If still
- // couldn't load the configuration file, then it will throw exception in HiveCatalog.
- URL configFile = CatalogLoader.class.getClassLoader().getResource("hive-site.xml");
- if (configFile != null) {
- newConf.addResource(configFile);
- }
- }
- return newConf;
- }
-
- public static Configuration clusterHadoopConf() {
- return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
- }
-
- @Override
- public Map<String, String> requiredContext() {
- Map<String, String> context = Maps.newHashMap();
- context.put(TYPE, "iceberg");
- context.put(PROPERTY_VERSION, "1");
- return context;
- }
-
- @Override
- public List<String> supportedProperties() {
- return ImmutableList.of("*");
- }
-
- @Override
- public Catalog createCatalog(String name, Map<String, String> properties) {
- return createCatalog(name, properties, clusterHadoopConf());
- }
-
- protected Catalog createCatalog(String name, Map<String, String> properties, Configuration hadoopConf) {
- CatalogLoader catalogLoader = createCatalogLoader(name, properties, hadoopConf);
- String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, DEFAULT_DATABASE_NAME);
-
- Namespace baseNamespace = Namespace.empty();
- if (properties.containsKey(BASE_NAMESPACE)) {
- baseNamespace = Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
- }
-
- boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
- return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink;
+
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.flink.CatalogLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
+ * <p>
+ * This supports the following catalog configuration options:
+ * <ul>
+ * <li><code>type</code> - Flink catalog factory key, should be "iceberg"</li>
+ * <li><code>catalog-type</code> - iceberg catalog type, "hive" or "hadoop"</li>
+ * <li><code>uri</code> - the Hive Metastore URI (Hive catalog only)</li>
+ * <li><code>clients</code> - the Hive Client Pool Size (Hive catalog only)</li>
+ * <li><code>warehouse</code> - the warehouse path (Hadoop catalog only)</li>
+ * <li><code>default-database</code> - a database name to use as the default</li>
+ * <li><code>base-namespace</code> - a base namespace as the prefix for all databases (Hadoop catalog only)</li>
+ * <li><code>cache-enabled</code> - whether to enable catalog cache</li>
+ * </ul>
+ * <p>
+ * To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override
+ * {@link #createCatalogLoader(String, Map, Configuration)}.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+public class FlinkCatalogFactory implements CatalogFactory {
+
+ // Can not just use "type", it conflicts with CATALOG_TYPE.
+ public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
+ public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+ public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+
+ public static final String HIVE_CONF_DIR = "hive-conf-dir";
+ public static final String DEFAULT_DATABASE = "default-database";
+ public static final String DEFAULT_DATABASE_NAME = "default";
+ public static final String BASE_NAMESPACE = "base-namespace";
+ public static final String CACHE_ENABLED = "cache-enabled";
+
+ public static final String TYPE = "type";
+ public static final String PROPERTY_VERSION = "property-version";
+
+ /**
+ * Create an Iceberg {@link org.apache.iceberg.catalog.Catalog} loader to be used by this Flink catalog adapter.
+ *
+ * @param name Flink's catalog name
+ * @param properties Flink's catalog properties
+ * @param hadoopConf Hadoop configuration for catalog
+ * @return an Iceberg catalog loader
+ */
+ static CatalogLoader createCatalogLoader(String name, Map<String, String> properties, Configuration hadoopConf) {
+ String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL);
+ if (catalogImpl != null) {
+ String catalogType = properties.get(ICEBERG_CATALOG_TYPE);
+ Preconditions.checkArgument(catalogType == null,
+ "Cannot create catalog %s, both catalog-type and catalog-impl are set: "
+ + "catalog-type=%s, catalog-impl=%s",
+ name, catalogType, catalogImpl);
+ return CatalogLoader.custom(name, properties, hadoopConf, catalogImpl);
+ }
+
+ String catalogType = properties.getOrDefault(ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
+ switch (catalogType.toLowerCase(Locale.ENGLISH)) {
+ case ICEBERG_CATALOG_TYPE_HIVE:
+ // The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in that case
+ // it will fallback to parse those values from hadoop configuration which is loaded from classpath.
+ String hiveConfDir = properties.get(HIVE_CONF_DIR);
+ Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir);
+ return CatalogLoader.hive(name, newHadoopConf, properties);
+
+ case ICEBERG_CATALOG_TYPE_HADOOP:
+ return CatalogLoader.hadoop(name, hadoopConf, properties);
+
+ default:
+ throw new UnsupportedOperationException("Unknown catalog-type: " + catalogType
+ + " (Must be 'hive' or 'hadoop')");
+ }
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = Maps.newHashMap();
+ context.put(TYPE, "iceberg");
+ context.put(PROPERTY_VERSION, "1");
+ return context;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ return ImmutableList.of("*");
+ }
+
+ @Override
+ public Catalog createCatalog(String name, Map<String, String> properties) {
+ return createCatalog(name, properties, clusterHadoopConf());
+ }
+
+ protected Catalog createCatalog(String name, Map<String, String> properties, Configuration hadoopConf) {
+ CatalogLoader catalogLoader = createCatalogLoader(name, properties, hadoopConf);
+ String defaultDatabase = properties.getOrDefault(DEFAULT_DATABASE, DEFAULT_DATABASE_NAME);
+
+ Namespace baseNamespace = Namespace.empty();
+ if (properties.containsKey(BASE_NAMESPACE)) {
+ baseNamespace = Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
+ }
+
+ boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
+ return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
+ }
+
+ private static Configuration mergeHiveConf(Configuration hadoopConf, String hiveConfDir) {
+ Configuration newConf = new Configuration(hadoopConf);
+ if (!Strings.isNullOrEmpty(hiveConfDir)) {
+ Preconditions.checkState(Files.exists(Paths.get(hiveConfDir, "hive-site.xml")),
+ "There should be a hive-site.xml file under the directory %s", hiveConfDir);
+ newConf.addResource(new Path(hiveConfDir, "hive-site.xml"));
+ } else {
+ // If don't provide the hive-site.xml path explicitly, it will try to load resource from classpath. If still
+ // couldn't load the configuration file, then it will throw exception in HiveCatalog.
+ URL configFile = CatalogLoader.class.getClassLoader().getResource("hive-site.xml");
+ if (configFile != null) {
+ newConf.addResource(configFile);
+ }
+ }
+ return newConf;
+ }
+
+ public static Configuration clusterHadoopConf() {
+ return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
similarity index 78%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
index e10626148..35d3a6c76 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
@@ -1,222 +1,233 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.sort.iceberg;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.factories.DynamicTableSinkFactory;
-import org.apache.flink.table.factories.DynamicTableSourceFactory;
-import org.apache.flink.table.utils.TableSchemaUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.common.DynMethods;
-import org.apache.iceberg.exceptions.AlreadyExistsException;
-import org.apache.iceberg.flink.IcebergTableSource;
-import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
-
-/**
- * Copy from org.apache.iceberg.flink:iceberg-flink-runtime-1.13:0.13.1
- *
- * <p>
- * Factory for creating configured instances of {@link IcebergTableSource} and {@link
- * IcebergTableSink}.We modify KafkaDynamicTableSink to support append-mode .
- * </p>
- */
-public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {
-
- static final String FACTORY_IDENTIFIER = "iceberg-inlong";
-
- private static final ConfigOption<String> CATALOG_NAME =
- ConfigOptions.key("catalog-name")
- .stringType()
- .noDefaultValue()
- .withDescription("Catalog name");
-
- private static final ConfigOption<String> CATALOG_TYPE =
- ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
- .stringType()
- .noDefaultValue()
- .withDescription("Catalog type, the optional types are: custom, hadoop, hive.");
-
- private static final ConfigOption<String> CATALOG_DATABASE =
- ConfigOptions.key("catalog-database")
- .stringType()
- .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
- .withDescription("Database name managed in the iceberg catalog.");
-
- private static final ConfigOption<String> CATALOG_TABLE =
- ConfigOptions.key("catalog-table")
- .stringType()
- .noDefaultValue()
- .withDescription("Table name managed in the underlying iceberg catalog and database.");
-
- // Flink 1.13.x change the return type from CatalogTable interface to ResolvedCatalogTable which extends the
- // CatalogTable. Here we use the dynamic method loading approach to avoid adding explicit CatalogTable or
- // ResolvedCatalogTable class into the iceberg-flink-runtime jar for compatibility purpose.
- private static final DynMethods.UnboundMethod GET_CATALOG_TABLE = DynMethods.builder("getCatalogTable")
- .impl(Context.class, "getCatalogTable")
- .orNoop()
- .build();
-
- private final FlinkCatalog catalog;
-
- public FlinkDynamicTableFactory() {
- this.catalog = null;
- }
-
- public FlinkDynamicTableFactory(FlinkCatalog catalog) {
- this.catalog = catalog;
- }
-
- private static CatalogTable loadCatalogTable(Context context) {
- return GET_CATALOG_TABLE.invoke(context);
- }
-
- private static TableLoader createTableLoader(CatalogBaseTable catalogBaseTable,
- Map<String, String> tableProps,
- String databaseName,
- String tableName) {
- Configuration flinkConf = new Configuration();
- tableProps.forEach(flinkConf::setString);
-
- String catalogName = flinkConf.getString(CATALOG_NAME);
- Preconditions.checkNotNull(catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key());
-
- String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName);
- Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null");
-
- String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
- Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null");
-
- org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
- FlinkCatalogFactory factory = new FlinkCatalogFactory();
- FlinkCatalog flinkCatalog = (FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf);
- ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable);
-
- // Create database if not exists in the external catalog.
- if (!flinkCatalog.databaseExists(catalogDatabase)) {
- try {
- flinkCatalog.createDatabase(catalogDatabase, new CatalogDatabaseImpl(Maps.newHashMap(), null), true);
- } catch (DatabaseAlreadyExistException e) {
- throw new AlreadyExistsException(e, "Database %s already exists in the iceberg catalog %s.",
- catalogName,
- catalogDatabase);
- }
- }
-
- // Create table if not exists in the external catalog.
- if (!flinkCatalog.tableExists(objectPath)) {
- try {
- flinkCatalog.createIcebergTable(objectPath, catalogBaseTable, true);
- } catch (TableAlreadyExistException e) {
- throw new AlreadyExistsException(e, "Table %s already exists in the database %s and catalog %s",
- catalogTable, catalogDatabase, catalogName);
- }
- }
-
- return TableLoader.fromCatalog(flinkCatalog.getCatalogLoader(),
- TableIdentifier.of(catalogDatabase, catalogTable));
- }
-
- private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) {
- Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
- return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath));
- }
-
- @Override
- public DynamicTableSource createDynamicTableSource(Context context) {
- ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
- CatalogTable catalogTable = loadCatalogTable(context);
- Map<String, String> tableProps = catalogTable.getOptions();
- TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
-
- TableLoader tableLoader;
- if (catalog != null) {
- tableLoader = createTableLoader(catalog, objectIdentifier.toObjectPath());
- } else {
- tableLoader = createTableLoader(catalogTable, tableProps, objectIdentifier.getDatabaseName(),
- objectIdentifier.getObjectName());
- }
-
- return new IcebergTableSource(tableLoader, tableSchema, tableProps, context.getConfiguration());
- }
-
- @Override
- public DynamicTableSink createDynamicTableSink(Context context) {
- ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
- CatalogTable catalogTable = loadCatalogTable(context);
- Map<String, String> tableProps = catalogTable.getOptions();
- TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
-
- TableLoader tableLoader;
- if (catalog != null) {
- tableLoader = createTableLoader(catalog, objectPath);
- } else {
- tableLoader = createTableLoader(catalogTable, tableProps, objectPath.getDatabaseName(),
- objectPath.getObjectName());
- }
-
- return new IcebergTableSink(tableLoader, tableSchema, catalogTable);
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- Set<ConfigOption<?>> options = Sets.newHashSet();
- options.add(CATALOG_TYPE);
- options.add(CATALOG_NAME);
- return options;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = Sets.newHashSet();
- options.add(CATALOG_DATABASE);
- options.add(CATALOG_TABLE);
- options.add(ICEBERG_IGNORE_ALL_CHANGELOG);
- options.add(INLONG_METRIC);
- options.add(INLONG_AUDIT);
- return options;
- }
-
- @Override
- public String factoryIdentifier() {
- return FACTORY_IDENTIFIER;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.flink.IcebergTableSource;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.iceberg.flink.actions.SyncRewriteDataFilesActionOption;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {
+ static final String FACTORY_IDENTIFIER = "dlc-inlong";
+
+ public static final ConfigOption<String> CATALOG_NAME =
+ ConfigOptions.key("catalog-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Catalog name");
+
+ public static final ConfigOption<String> CATALOG_TYPE =
+ ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Catalog type, the optional types are: custom, hadoop, hive.");
+
+ public static final ConfigOption<String> CATALOG_DATABASE =
+ ConfigOptions.key("catalog-database")
+ .stringType()
+ .defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
+ .withDescription("Database name managed in the iceberg catalog.");
+
+ public static final ConfigOption<String> CATALOG_TABLE =
+ ConfigOptions.key("catalog-table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name managed in the underlying iceberg catalog and database.");
+
+ public static final ConfigOption<Boolean> ICEBERG_IGNORE_ALL_CHANGELOG =
+ ConfigOptions.key("sink.ignore.changelog")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Regard upsert delete as insert kind.");
+
+ // Flink 1.13.x change the return type from CatalogTable interface to ResolvedCatalogTable which extends the
+ // CatalogTable. Here we use the dynamic method loading approach to avoid adding explicit CatalogTable or
+ // ResolvedCatalogTable class into the iceberg-flink-runtime jar for compatibility purpose.
+ private static final DynMethods.UnboundMethod GET_CATALOG_TABLE = DynMethods.builder("getCatalogTable")
+ .impl(Context.class, "getCatalogTable")
+ .orNoop()
+ .build();
+
+ private final FlinkCatalog catalog;
+
+ public FlinkDynamicTableFactory() {
+ this.catalog = null;
+ }
+
+ public FlinkDynamicTableFactory(FlinkCatalog catalog) {
+ this.catalog = catalog;
+ }
+
+ private static CatalogTable loadCatalogTable(Context context) {
+ return GET_CATALOG_TABLE.invoke(context);
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
+ CatalogTable catalogTable = loadCatalogTable(context);
+ Map<String, String> tableProps = catalogTable.getOptions();
+ TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
+
+ TableLoader tableLoader;
+ if (catalog != null) {
+ tableLoader = createTableLoader(catalog, objectIdentifier.toObjectPath());
+ } else {
+ tableLoader = createTableLoader(catalogTable, tableProps, objectIdentifier.getDatabaseName(),
+ objectIdentifier.getObjectName());
+ }
+
+ return new IcebergTableSource(tableLoader, tableSchema, tableProps, context.getConfiguration());
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
+ CatalogTable catalogTable = loadCatalogTable(context);
+ Map<String, String> tableProps = catalogTable.getOptions();
+ TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
+ SyncRewriteDataFilesActionOption compactOption = new SyncRewriteDataFilesActionOption(tableProps);
+ MetricOption metricOption = null;
+ if (tableProps.containsKey(INLONG_METRIC.key())) {
+ metricOption = new MetricOption(
+ tableProps.getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()),
+ tableProps.getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()));
+ }
+ boolean appendMode = tableProps.containsKey(ICEBERG_IGNORE_ALL_CHANGELOG.key())
+ ? Boolean.parseBoolean(tableProps.get(ICEBERG_IGNORE_ALL_CHANGELOG.key()))
+ : ICEBERG_IGNORE_ALL_CHANGELOG.defaultValue();
+
+ TableLoader tableLoader;
+ if (catalog != null) {
+ tableLoader = createTableLoader(catalog, objectPath);
+ } else {
+ tableLoader = createTableLoader(catalogTable, tableProps, objectPath.getDatabaseName(),
+ objectPath.getObjectName());
+ }
+
+ return new IcebergTableSink(tableLoader, tableSchema, compactOption, metricOption, appendMode);
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = Sets.newHashSet();
+ options.add(CATALOG_TYPE);
+ options.add(CATALOG_NAME);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = Sets.newHashSet();
+ options.add(CATALOG_DATABASE);
+ options.add(CATALOG_TABLE);
+ options.add(ICEBERG_IGNORE_ALL_CHANGELOG);
+ options.add(INLONG_METRIC);
+ options.add(INLONG_AUDIT);
+ return options;
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return FACTORY_IDENTIFIER;
+ }
+
+ private static TableLoader createTableLoader(CatalogBaseTable catalogBaseTable,
+ Map<String, String> tableProps,
+ String databaseName,
+ String tableName) {
+ Configuration flinkConf = new Configuration();
+ tableProps.forEach(flinkConf::setString);
+
+ String catalogName = flinkConf.getString(CATALOG_NAME);
+ Preconditions.checkNotNull(catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key());
+
+ String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName);
+ Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null");
+
+ String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
+ Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null");
+
+ org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
+ FlinkCatalogFactory factory = new FlinkCatalogFactory();
+ FlinkCatalog flinkCatalog = (FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf);
+ ObjectPath objectPath = new ObjectPath(catalogDatabase, catalogTable);
+
+ // Create database if not exists in the external catalog.
+ if (!flinkCatalog.databaseExists(catalogDatabase)) {
+ try {
+ flinkCatalog.createDatabase(catalogDatabase,
+ new CatalogDatabaseImpl(Maps.newHashMap(), null), true);
+ } catch (DatabaseAlreadyExistException e) {
+ throw new AlreadyExistsException(
+ e, "Database %s already exists in the iceberg catalog %s.", catalogName, catalogDatabase);
+ }
+ }
+
+ // Create table if not exists in the external catalog.
+ if (!flinkCatalog.tableExists(objectPath)) {
+ try {
+ flinkCatalog.createIcebergTable(objectPath, catalogBaseTable, true);
+ } catch (TableAlreadyExistException e) {
+ throw new AlreadyExistsException(e, "Table %s already exists in the database %s and catalog %s",
+ catalogTable, catalogDatabase, catalogName);
+ }
+ }
+
+ return TableLoader.fromCatalog(
+ flinkCatalog.getCatalogLoader(), TableIdentifier.of(catalogDatabase, catalogTable));
+ }
+
+ private static TableLoader createTableLoader(FlinkCatalog catalog, ObjectPath objectPath) {
+ Preconditions.checkNotNull(catalog, "Flink catalog cannot be null");
+ return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath));
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
similarity index 74%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
index b5f3bef9e..112b6a36b 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
@@ -17,11 +17,10 @@
* under the License.
*/
-package org.apache.inlong.sort.iceberg;
+package org.apache.inlong.sort.iceberg.flink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
-import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -31,39 +30,51 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.inlong.sort.iceberg.sink.FlinkSink;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.iceberg.flink.actions.SyncRewriteDataFilesActionOption;
+import org.apache.inlong.sort.iceberg.flink.sink.FlinkSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
-import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
-
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ * Add a table property `write.compact.enable` to support small file compact.
+ * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit
+ */
public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
-
private static final Logger LOG = LoggerFactory.getLogger(IcebergTableSink.class);
-
private final TableLoader tableLoader;
private final TableSchema tableSchema;
-
- private final CatalogTable catalogTable;
+ private final SyncRewriteDataFilesActionOption compactAction;
+ private final MetricOption metricOption;
+ private final boolean appendMode;
private boolean overwrite = false;
private IcebergTableSink(IcebergTableSink toCopy) {
this.tableLoader = toCopy.tableLoader;
this.tableSchema = toCopy.tableSchema;
+ this.compactAction = toCopy.compactAction;
+ this.metricOption = toCopy.metricOption;
this.overwrite = toCopy.overwrite;
- this.catalogTable = toCopy.catalogTable;
+ this.appendMode = toCopy.appendMode;
}
- public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema, CatalogTable catalogTable) {
+ public IcebergTableSink(
+ TableLoader tableLoader,
+ TableSchema tableSchema,
+ SyncRewriteDataFilesActionOption compactAction,
+ MetricOption metricOption,
+ boolean appendMode) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
- this.catalogTable = catalogTable;
+ this.compactAction = compactAction;
+ this.metricOption = metricOption;
+ this.appendMode = appendMode;
}
@Override
@@ -80,8 +91,9 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
- .metric(catalogTable.getOptions().getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()),
- catalogTable.getOptions().getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()))
+ .metric(metricOption)
+ .appendMode(appendMode)
+ .compact(compactAction)
.append();
}
@@ -92,8 +104,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
- if (org.apache.flink.configuration.Configuration.fromMap(catalogTable.getOptions())
- .get(ICEBERG_IGNORE_ALL_CHANGELOG)) {
+ if (appendMode) {
LOG.warn("Iceberg sink receive all changelog record. "
+ "Regard any other record as insert-only record.");
return ChangelogMode.all();
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/RewriteResult.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/RewriteResult.java
new file mode 100644
index 000000000..3e9e421ae
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/RewriteResult.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.actions;
+
+public class RewriteResult {
+ private String result;
+
+ public RewriteResult(String result) {
+ this.result = result;
+ }
+
+ public String getResult() {
+ return result;
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/SyncRewriteDataFilesAction.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/SyncRewriteDataFilesAction.java
new file mode 100644
index 000000000..e9b83e9f4
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/SyncRewriteDataFilesAction.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.actions;
+
+import org.apache.iceberg.actions.Action;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Do rewrite action with dlc Spark SQL.
+ */
+public class SyncRewriteDataFilesAction implements
+ Action<SyncRewriteDataFilesAction, RewriteResult> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncRewriteDataFilesAction.class);
+ private static final String DLC_JDBC_CLASS = "com.tencent.cloud.dlc.jdbc.DlcDriver";
+
+ private SyncRewriteDataFilesActionOption options;
+ private AtomicInteger snapshotCounter;
+
+ public SyncRewriteDataFilesAction(SyncRewriteDataFilesActionOption option) {
+ this.snapshotCounter = new AtomicInteger();
+ this.options = option;
+ }
+
+ @Override
+ public SyncRewriteDataFilesAction option(String name, String value) {
+ this.options.option(name, value);
+ return this;
+ }
+
+ @Override
+ public SyncRewriteDataFilesAction options(Map<String, String> options) {
+ this.options.options(options);
+ return this;
+ }
+
+ @Override
+ public RewriteResult execute() {
+ if (!shouldExecute()) {
+ return new RewriteResult("Skip This compact.");
+ }
+
+ Connection connection = buildConnection();
+ if (connection == null) {
+ LOG.error("Can't get DLC JDBC Connection");
+ return new RewriteResult("fail.");
+ }
+
+ String rewriteTableSql = options.rewriteSql();
+ try {
+ Statement statement = connection.createStatement();
+ LOG.info("Do compact: {}", rewriteTableSql);
+ boolean firstIsResultSet = statement.execute(rewriteTableSql);
+ if (firstIsResultSet) {
+ ResultSet rs = statement.getResultSet();
+ ResultSetMetaData rsmd = rs.getMetaData();
+ int columnsNumber = rsmd.getColumnCount();
+ while (rs.next()) {
+ StringBuilder lineResult = new StringBuilder();
+ for (int i = 1; i <= columnsNumber; i++) {
+ if (i > 1) {
+ lineResult.append(", ");
+ }
+ lineResult.append(rsmd.getColumnName(i) + ":" + rs.getString(i));
+ }
+ LOG.info("[Result:]{}", lineResult);
+ }
+ } else {
+ LOG.info("[Result:]there has no output.");
+ }
+ statement.close();
+ connection.close();
+ } catch (SQLException e) {
+ LOG.warn("[Result:]Execute rewrite sql({}) err.", rewriteTableSql, e);
+ return new RewriteResult("fail.");
+ }
+ return new RewriteResult("success.");
+ }
+
+ private boolean shouldExecute() {
+ return snapshotCounter.incrementAndGet() % options.interval() == 0;
+ }
+
+ private Connection buildConnection() {
+ Connection connection = null;
+ String url = options.url();
+ try {
+ Class.forName(DLC_JDBC_CLASS);
+ connection = DriverManager.getConnection(
+ url,
+ options.secretId(),
+ options.secretKey());
+ // get meta data
+ DatabaseMetaData metaData = connection.getMetaData();
+ LOG.info("DLC product = {}, DLC jdbc version = {}, DLC jdbc = '{}'",
+ metaData.getDatabaseProductName(), metaData.getDriverMajorVersion(), url);
+ } catch (SQLException e) {
+ LOG.error("Create connection err.Please check configuration. Request URL: {}.", url, e);
+ } catch (ClassNotFoundException e) {
+ LOG.error("DLC JDBC Driver class not found.Please check classpath({}).",
+ System.getProperty("java.class.path"), e);
+ }
+ return connection;
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/SyncRewriteDataFilesActionOption.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/SyncRewriteDataFilesActionOption.java
new file mode 100644
index 000000000..96d59623c
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/actions/SyncRewriteDataFilesActionOption.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.actions;
+
+import com.qcloud.dlc.common.Constants;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.flink.FlinkCatalogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.ACTION_AUTO_COMPACT_OPTIONS;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_INTERVAL;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_INTERVAL_DEFAULT;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_PREFIX;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_RESOUCE_POOL;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_RESOUCE_POOL_DEFAULT;
+import static org.apache.inlong.sort.iceberg.flink.FlinkDynamicTableFactory.CATALOG_DATABASE;
+import static org.apache.inlong.sort.iceberg.flink.FlinkDynamicTableFactory.CATALOG_TABLE;
+
+public class SyncRewriteDataFilesActionOption implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(SyncRewriteDataFilesAction.class);
+
+ private Map<String, String> properties;
+
+ public static final String URL = "url";
+
+ public static final String URL_REGION = "region";
+ public static final String URL_REGION_DEFAULT = "ap-beijing";
+
+ public static final String URL_DEFAULT_DATABASE = "database_name";
+ public static final String URL_DEFAULT_DATABASE_DEFAULT = "default";
+
+ public static final String URL_ENDPOINT = "endpoint";
+ public static final String URL_ENDPOINT_DEFAULT = "dlc.tencentcloudapi.com";
+
+ public static final String URL_TASK_TYPE = "task_type";
+ public static final String URL_TASK_TYPE_DEFAULT = "SparkSQLTask";
+
+ public static final String URL_DATA_SOURCE = "datasource_connection_name";
+ public static final String URL_DATA_SOURCE_DEFAULT = "DataLakeCatalog";
+
+ public static final String URL_DATA_RESOURCE_NAME = "data_engine_name";
+
+ public static final String AUTH_SECRET_ID = "secret_id";
+ public static final String AUTH_SECRET_KEY = "secret_key";
+
+ public static final String REWRITE_DB_NAME = "db_name";
+ public static final String REWRITE_TABLE_NAME = "table_name";
+
+ public SyncRewriteDataFilesActionOption(Map<String, String> tableProperties) {
+ Preconditions.checkNotNull(CATALOG_DATABASE.key());
+ Preconditions.checkNotNull(CATALOG_TABLE.key());
+ Preconditions.checkNotNull(Constants.DLC_SECRET_ID_CONF);
+ Preconditions.checkNotNull(Constants.DLC_SECRET_KEY_CONF);
+ this.properties = new HashMap<>();
+ Optional.ofNullable(tableProperties.get("qcloud.dlc.jdbc.url")).ifPresent(v -> properties.put(URL, v));
+ properties.put(URL_REGION, tableProperties.get(Constants.DLC_REGION_CONF));
+ properties.put(AUTH_SECRET_ID, tableProperties.get(Constants.DLC_SECRET_ID_CONF));
+ properties.put(AUTH_SECRET_KEY, tableProperties.get(Constants.DLC_SECRET_KEY_CONF));
+ properties.put(URL_DEFAULT_DATABASE, tableProperties.get(FlinkCatalogFactory.DEFAULT_DATABASE));
+ properties.put(REWRITE_DB_NAME, tableProperties.get(CATALOG_DATABASE.key()));
+ properties.put(REWRITE_TABLE_NAME, tableProperties.get(CATALOG_TABLE.key()));
+ }
+
+ public void option(String name, String value) {
+ properties.put(name, value);
+ }
+
+ public void options(Map<String, String> newOptions) {
+ properties.putAll(newOptions);
+ }
+
+ public String url() {
+ String jdbcPrefix = "jdbc:dlc:";
+ String endpoint;
+ Map<String, String> urlParams = new HashMap<>();
+ if (properties.get(URL) != null) {
+ String url = properties.get(URL);
+ int splitPoint = url.indexOf("?") == -1 ? url.length() : url.indexOf("?");
+ endpoint = url.substring(jdbcPrefix.length(), splitPoint);
+ Stream.of(url.substring(splitPoint + 1).split("&"))
+ .forEach(kv -> {
+ String[] param = kv.split("=");
+ if (param.length == 2) {
+ urlParams.put(param[0], param[1]);
+ }
+ });
+ Optional.ofNullable(properties.get(COMPACT_RESOUCE_POOL))
+ .ifPresent(v -> urlParams.put(URL_DATA_RESOURCE_NAME, v));
+ } else {
+ endpoint = properties.getOrDefault(URL_ENDPOINT, URL_ENDPOINT_DEFAULT);
+ urlParams.put(URL_TASK_TYPE, properties.getOrDefault(URL_TASK_TYPE, URL_TASK_TYPE_DEFAULT));
+ urlParams.put(URL_DEFAULT_DATABASE,
+ properties.getOrDefault(URL_DEFAULT_DATABASE, URL_DEFAULT_DATABASE_DEFAULT));
+ urlParams.put(URL_DATA_SOURCE, properties.getOrDefault(URL_DATA_SOURCE, URL_DATA_SOURCE_DEFAULT));
+ urlParams.put(URL_REGION, properties.getOrDefault(URL_REGION, URL_REGION_DEFAULT));
+ urlParams.put(URL_DATA_RESOURCE_NAME,
+ properties.getOrDefault(COMPACT_RESOUCE_POOL, COMPACT_RESOUCE_POOL_DEFAULT));
+
+ }
+ List<String> urlParamsList =
+ urlParams.entrySet().stream().map(kv -> kv.getKey() + "=" + kv.getValue()).collect(Collectors.toList());
+ return jdbcPrefix + endpoint + "?" + String.join("&", urlParamsList);
+
+ }
+
+ public String secretId() {
+ return properties.get(AUTH_SECRET_ID);
+ }
+
+ public String secretKey() {
+ return properties.get(AUTH_SECRET_KEY);
+ }
+
+ public String rewriteSql() {
+ String dbName = properties.get(REWRITE_DB_NAME);
+ String tableName = properties.get(REWRITE_TABLE_NAME);
+ Preconditions.checkNotNull(dbName);
+ Preconditions.checkNotNull(tableName);
+ String wholeTableName = String.format("%s.%s", dbName, tableName);
+ String rewriteOptions = String.join(",",
+ ACTION_AUTO_COMPACT_OPTIONS.stream()
+ .filter(properties::containsKey)
+ .map(k -> String.format("'%s', '%s'", k.substring(COMPACT_PREFIX.length()), properties.get(k)))
+ .collect(Collectors.toList()));
+ String rewriteTableSql;
+ if (rewriteOptions.isEmpty()) {
+ rewriteTableSql = String.format(
+ "CALL `DataLakeCatalog`.`system`.rewrite_data_files (`table` => '%s')",
+ wholeTableName);
+ } else {
+ rewriteTableSql =
+ String.format(
+ "CALL `DataLakeCatalog`.`system`.rewrite_data_files"
+ + "(`table` => '%s', options => map(%s))",
+ wholeTableName, rewriteOptions);
+ }
+ return rewriteTableSql;
+ }
+
+ public int interval() {
+ return PropertyUtil.propertyAsInt(properties, COMPACT_INTERVAL, COMPACT_INTERVAL_DEFAULT);
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/BaseDeltaTaskWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/BaseDeltaTaskWriter.java
new file mode 100644
index 000000000..702353999
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.data.RowDataProjection;
+import org.apache.iceberg.io.BaseTaskWriter;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
+
+ private final Schema schema;
+ private final Schema deleteSchema;
+ private final RowDataWrapper wrapper;
+ private final RowDataWrapper keyWrapper;
+ private final RowDataProjection keyProjection;
+ private final boolean upsert;
+
+ BaseDeltaTaskWriter(PartitionSpec spec,
+ FileFormat format,
+ FileAppenderFactory<RowData> appenderFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSize,
+ Schema schema,
+ RowType flinkSchema,
+ List<Integer> equalityFieldIds,
+ boolean upsert) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+ this.schema = schema;
+ this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
+ this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+ this.upsert = upsert;
+ this.keyWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
+ this.keyProjection = RowDataProjection.create(schema, deleteSchema);
+ }
+
+ abstract RowDataDeltaWriter route(RowData row);
+
+ RowDataWrapper wrapper() {
+ return wrapper;
+ }
+
+ @Override
+ public void write(RowData row) throws IOException {
+ RowDataDeltaWriter writer = route(row);
+
+ switch (row.getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ if (upsert) {
+ writer.deleteKey(keyProjection.wrap(row));
+ }
+ writer.write(row);
+ break;
+
+ case UPDATE_BEFORE:
+ if (upsert) {
+ break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
+ }
+ writer.delete(row);
+ break;
+ case DELETE:
+ writer.delete(row);
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
+ }
+ }
+
+ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
+ RowDataDeltaWriter(PartitionKey partition) {
+ super(partition, schema, deleteSchema);
+ }
+
+ @Override
+ protected StructLike asStructLike(RowData data) {
+ return wrapper.wrap(data);
+ }
+
+ @Override
+ protected StructLike asStructLikeKey(RowData data) {
+ return keyWrapper.wrap(data);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifests.java
similarity index 95%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifests.java
index 6b52dff8b..d9b21faf2 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifests.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -25,6 +25,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import java.util.List;
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
class DeltaManifests {
private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0];
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifestsSerializer.java
similarity index 97%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifestsSerializer.java
index fb2187d76..3d6a4f71f 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifestsSerializer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.iceberg.ManifestFile;
@@ -30,6 +30,9 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
private static final int VERSION_1 = 1;
private static final int VERSION_2 = 2;
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkManifestUtil.java
similarity index 94%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkManifestUtil.java
index 1cbe0d70b..6a444d1d3 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkManifestUtil.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
@@ -38,6 +38,9 @@ import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
class FlinkManifestUtil {
private static final int FORMAT_V2 = 2;
private static final Long DUMMY_SNAPSHOT_ID = 0L;
@@ -62,10 +65,11 @@ class FlinkManifestUtil {
}
}
- static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
- long attemptNumber) {
+ static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, String operatorUniqueId,
+ int subTaskId, long attemptNumber) {
TableOperations ops = ((HasTableOperations) table).operations();
- return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
+ return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, operatorUniqueId,
+ subTaskId, attemptNumber);
}
static DeltaManifests writeCompletedFiles(WriteResult result,
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
similarity index 78%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
index d86857338..0b48fb169 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
@@ -1,502 +1,532 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.inlong.sort.iceberg.sink;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DistributionMode;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.SerializableTable;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
-import org.apache.iceberg.flink.sink.TaskWriterFactory;
-import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
-import org.apache.iceberg.io.WriteResult;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PropertyUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.function.Function;
-
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
-import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
-import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
-import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
-
-public class FlinkSink {
- private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
-
- private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
- private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();
-
- private FlinkSink() {
- }
-
- /**
- * Initialize a {@link Builder} to export the data from generic input data stream into iceberg table. We use
- * {@link RowData} inside the sink connector, so users need to provide a mapper function and a
- * {@link TypeInformation} to convert those generic records to a RowData DataStream.
- *
- * @param input the generic source input data stream.
- * @param mapper function to convert the generic data to {@link RowData}
- * @param outputType to define the {@link TypeInformation} for the input data.
- * @param <T> the data type of records.
- * @return {@link Builder} to connect the iceberg table.
- */
- public static <T> Builder builderFor(DataStream<T> input,
- MapFunction<T, RowData> mapper,
- TypeInformation<RowData> outputType) {
- return new Builder().forMapperOutputType(input, mapper, outputType);
- }
-
- /**
- * Initialize a {@link Builder} to export the data from input data stream with {@link Row}s into iceberg table.
- * We use {@link RowData} inside the sink connector, so users need to provide a {@link TableSchema} for builder to
- * convert those {@link Row}s to a {@link RowData} DataStream.
- *
- * @param input the source input data stream with {@link Row}s.
- * @param tableSchema defines the {@link TypeInformation} for input data.
- * @return {@link Builder} to connect the iceberg table.
- */
- public static Builder forRow(DataStream<Row> input, TableSchema tableSchema) {
- RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
- DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
-
- DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
- return builderFor(input, rowConverter::toInternal, FlinkCompatibilityUtil.toTypeInfo(rowType))
- .tableSchema(tableSchema);
- }
-
- /**
- * Initialize a {@link Builder} to export the data from input data stream with {@link RowData}s into iceberg table.
- *
- * @param input the source input data stream with {@link RowData}s.
- * @return {@link Builder} to connect the iceberg table.
- */
- public static Builder forRowData(DataStream<RowData> input) {
- return new Builder().forRowData(input);
- }
-
- public static class Builder {
- private Function<String, DataStream<RowData>> inputCreator = null;
- private TableLoader tableLoader;
- private Table table;
- private TableSchema tableSchema;
- private boolean overwrite = false;
- private DistributionMode distributionMode = null;
- private Integer writeParallelism = null;
- private boolean upsert = false;
- private List<String> equalityFieldColumns = null;
- private String uidPrefix = null;
- private String inlongMetric = null;
- private String auditHostAndPorts = null;
-
- private Builder() {
- }
-
- private Builder forRowData(DataStream<RowData> newRowDataInput) {
- this.inputCreator = ignored -> newRowDataInput;
- return this;
- }
-
- private <T> Builder forMapperOutputType(DataStream<T> input,
- MapFunction<T, RowData> mapper,
- TypeInformation<RowData> outputType) {
- this.inputCreator = newUidPrefix -> {
- if (newUidPrefix != null) {
- return input.map(mapper, outputType)
- .name(operatorName(newUidPrefix))
- .uid(newUidPrefix + "-mapper");
- } else {
- return input.map(mapper, outputType);
- }
- };
- return this;
- }
-
- /**
- * This iceberg {@link Table} instance is used for initializing {@link IcebergStreamWriter} which will write all
- * the records into {@link DataFile}s and emit them to downstream operator. Providing a table would avoid so
- * many table loading from each separate task.
- *
- * @param newTable the loaded iceberg table instance.
- * @return {@link Builder} to connect the iceberg table.
- */
- public Builder table(Table newTable) {
- this.table = newTable;
- return this;
- }
-
- /**
- * The table loader is used for loading tables in {@link IcebergFilesCommitter} lazily, we need this loader
- * because {@link Table} is not serializable and could not just use the loaded table from Builder#table in the
- * remote task manager.
- *
- * @param newTableLoader to load iceberg table inside tasks.
- * @return {@link Builder} to connect the iceberg table.
- */
- public Builder tableLoader(TableLoader newTableLoader) {
- this.tableLoader = newTableLoader;
- return this;
- }
-
- public Builder tableSchema(TableSchema newTableSchema) {
- this.tableSchema = newTableSchema;
- return this;
- }
-
- public Builder overwrite(boolean newOverwrite) {
- this.overwrite = newOverwrite;
- return this;
- }
-
- /**
- * Add metric output for iceberg writer
- * @param inlongMetric
- * @param auditHostAndPorts
- * @return
- */
- public Builder metric(String inlongMetric, String auditHostAndPorts) {
- this.inlongMetric = inlongMetric;
- this.auditHostAndPorts = auditHostAndPorts;
- return this;
- }
-
- /**
- * Configure the write {@link DistributionMode} that the flink sink will use. Currently, flink support
- * {@link DistributionMode#NONE} and {@link DistributionMode#HASH}.
- *
- * @param mode to specify the write distribution mode.
- * @return {@link Builder} to connect the iceberg table.
- */
- public Builder distributionMode(DistributionMode mode) {
- Preconditions.checkArgument(!DistributionMode.RANGE.equals(mode),
- "Flink does not support 'range' write distribution mode now.");
- this.distributionMode = mode;
- return this;
- }
-
- /**
- * Configuring the write parallel number for iceberg stream writer.
- *
- * @param newWriteParallelism the number of parallel iceberg stream writer.
- * @return {@link Builder} to connect the iceberg table.
- */
- public Builder writeParallelism(int newWriteParallelism) {
- this.writeParallelism = newWriteParallelism;
- return this;
- }
-
- /**
- * All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will
- * DELETE the old records and then INSERT the new records. In partitioned table, the partition fields should be
- * a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the
- * new row that located in partition-B.
- *
- * @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
- * @return {@link Builder} to connect the iceberg table.
- */
- public Builder upsert(boolean enabled) {
- this.upsert = enabled;
- return this;
- }
-
- /**
- * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
- *
- * @param columns defines the iceberg table's key.
- * @return {@link Builder} to connect the iceberg table.
- */
- public Builder equalityFieldColumns(List<String> columns) {
- this.equalityFieldColumns = columns;
- return this;
- }
-
- /**
- * Set the uid prefix for FlinkSink operators. Note that FlinkSink internally consists of multiple operators
- * (like writer, committer, dummy sink etc.) Actually operator uid will be appended with a suffix like
- * "uidPrefix-writer".
- * <br><br>
- * If provided, this prefix is also applied to operator names.
- * <br><br>
- * Flink auto generates operator uid if not set explicitly. It is a recommended
- * <a href="https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/">
- * best-practice to set uid for all operators</a> before deploying to production. Flink has an option to {@code
- * pipeline.auto-generate-uid=false} to disable auto-generation and force explicit setting of all operator uid.
- * <br><br>
- * Be careful with setting this for an existing job, because now we are changing the operator uid from an
- * auto-generated one to this new value. When deploying the change with a checkpoint, Flink won't be able to
- * restore the previous Flink sink operator state (more specifically the committer operator state). You need to
- * use {@code --allowNonRestoredState} to ignore the previous sink state. During restore Flink sink state is
- * used to check if last commit was actually successful or not. {@code --allowNonRestoredState} can lead to data
- * loss if the Iceberg commit failed in the last completed checkpoint.
- *
- * @param newPrefix prefix for Flink sink operator uid and name
- * @return {@link Builder} to connect the iceberg table.
- */
- public Builder uidPrefix(String newPrefix) {
- this.uidPrefix = newPrefix;
- return this;
- }
-
- private <T> DataStreamSink<T> chainIcebergOperators() {
- Preconditions.checkArgument(inputCreator != null,
- "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
- Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
-
- DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);
-
- if (table == null) {
- tableLoader.open();
- try (TableLoader loader = tableLoader) {
- this.table = loader.loadTable();
- } catch (IOException e) {
- throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
- }
- }
-
- // Convert the requested flink table schema to flink row type.
- RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
-
- // Distribute the records from input data stream based on the write.distribution-mode.
- DataStream<RowData> distributeStream = distributeDataStream(
- rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
-
- // Add parallel writers that append rows to files
- SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);
-
- // Add single-parallelism committer that commits files
- // after successful checkpoint or end of input
- SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
-
- // Add dummy discard sink
- return appendDummySink(committerStream);
- }
-
- /**
- * Append the iceberg sink operators to write records to iceberg table.
- *
- * @return {@link DataStreamSink} for sink.
- * @deprecated this will be removed in 0.14.0; use {@link #append()} because its returned {@link DataStreamSink}
- * has a more correct data type.
- */
- @Deprecated
- public DataStreamSink<RowData> build() {
- return chainIcebergOperators();
- }
-
- /**
- * Append the iceberg sink operators to write records to iceberg table.
- *
- * @return {@link DataStreamSink} for sink.
- */
- public DataStreamSink<Void> append() {
- return chainIcebergOperators();
- }
-
- private String operatorName(String suffix) {
- return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
- }
-
- @SuppressWarnings("unchecked")
- private <T> DataStreamSink<T> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
- DataStreamSink<T> resultStream = committerStream
- .addSink(new DiscardingSink())
- .name(operatorName(String.format("IcebergSink %s", this.table.name())))
- .setParallelism(1);
- if (uidPrefix != null) {
- resultStream = resultStream.uid(uidPrefix + "-dummysink");
- }
- return resultStream;
- }
-
- private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
- IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
- SingleOutputStreamOperator<Void> committerStream = writerStream
- .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
- .setParallelism(1)
- .setMaxParallelism(1);
- if (uidPrefix != null) {
- committerStream = committerStream.uid(uidPrefix + "-committer");
- }
- return committerStream;
- }
-
- private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
- // Find out the equality field id list based on the user-provided equality field column names.
- List<Integer> equalityFieldIds = Lists.newArrayList();
- if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
- for (String column : equalityFieldColumns) {
- org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
- Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
- column, table.schema());
- equalityFieldIds.add(field.fieldId());
- }
- }
-
- // Fallback to use upsert mode parsed from table properties if don't specify in job level.
- boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
- UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
-
- // Validate the equality fields and partition fields if we enable the upsert mode.
- if (upsertMode) {
- Preconditions.checkState(!overwrite,
- "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
- Preconditions.checkState(!equalityFieldIds.isEmpty(),
- "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
- if (!table.spec().isUnpartitioned()) {
- for (PartitionField partitionField : table.spec().fields()) {
- Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
- "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
- partitionField, equalityFieldColumns);
- }
- }
- }
-
- IcebergStreamWriter<RowData> streamWriter = createStreamWriter(
- table, flinkRowType, equalityFieldIds, upsertMode, inlongMetric, auditHostAndPorts);
-
- int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
- SingleOutputStreamOperator<WriteResult> writerStream = input
- .transform(operatorName(ICEBERG_STREAM_WRITER_NAME),
- TypeInformation.of(WriteResult.class),
- streamWriter)
- .setParallelism(parallelism);
- if (uidPrefix != null) {
- writerStream = writerStream.uid(uidPrefix + "-writer");
- }
- return writerStream;
- }
-
- private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
- Map<String, String> properties,
- PartitionSpec partitionSpec,
- Schema iSchema,
- RowType flinkRowType) {
- DistributionMode writeMode;
- if (distributionMode == null) {
- // Fallback to use distribution mode parsed from table properties if don't specify in job level.
- String modeName = PropertyUtil.propertyAsString(properties,
- WRITE_DISTRIBUTION_MODE,
- WRITE_DISTRIBUTION_MODE_NONE);
-
- writeMode = DistributionMode.fromName(modeName);
- } else {
- writeMode = distributionMode;
- }
-
- switch (writeMode) {
- case NONE:
- return input;
-
- case HASH:
- if (partitionSpec.isUnpartitioned()) {
- return input;
- } else {
- return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
- }
-
- case RANGE:
- LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
- WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
- return input;
-
- default:
- throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
- }
- }
- }
-
- static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
- if (requestedSchema != null) {
- // Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing iceberg
- // schema.
- Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
- TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
-
- // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will be promoted
- // to iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 'byte'),
- // we will read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here we must
- // use flink schema.
- return (RowType) requestedSchema.toRowDataType().getLogicalType();
- } else {
- return FlinkSchemaUtil.convert(schema);
- }
- }
-
- static IcebergStreamWriter<RowData> createStreamWriter(Table table,
- RowType flinkRowType,
- List<Integer> equalityFieldIds,
- boolean upsert,
- String inlongMetric,
- String auditHostAndPorts) {
- Preconditions.checkArgument(table != null, "Iceberg table should't be null");
- Map<String, String> props = table.properties();
- long targetFileSize = getTargetFileSizeBytes(props);
- FileFormat fileFormat = getFileFormat(props);
-
- Table serializableTable = SerializableTable.copyOf(table);
- TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
- serializableTable, flinkRowType, targetFileSize,
- fileFormat, equalityFieldIds, upsert);
-
- return new IcebergStreamWriter<>(table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts);
- }
-
- private static FileFormat getFileFormat(Map<String, String> properties) {
- String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
- return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
- }
-
- private static long getTargetFileSizeBytes(Map<String, String> properties) {
- return PropertyUtil.propertyAsLong(properties,
- WRITE_TARGET_FILE_SIZE_BYTES,
- WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.sink;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.iceberg.flink.actions.SyncRewriteDataFilesActionOption;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
+import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ * Add a table property `write.compact.enable` to support small file compact.
+ * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit.
+ */
+public class FlinkSink {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
+
+ private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
+ private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();
+
+ private FlinkSink() {
+ }
+
+ /**
+ * Initialize a {@link FlinkSink.Builder} to export the data from generic input data stream into iceberg table.
+ * We use {@link RowData} inside the sink connector, so users need to provide a mapper function and a
+ * {@link TypeInformation} to convert those generic records to a RowData DataStream.
+ *
+ * @param input the generic source input data stream.
+ * @param mapper function to convert the generic data to {@link RowData}
+ * @param outputType to define the {@link TypeInformation} for the input data.
+ * @param <T> the data type of records.
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public static <T> FlinkSink.Builder builderFor(DataStream<T> input,
+ MapFunction<T, RowData> mapper,
+ TypeInformation<RowData> outputType) {
+ return new FlinkSink.Builder().forMapperOutputType(input, mapper, outputType);
+ }
+
+ /**
+ * Initialize a {@link FlinkSink.Builder} to export the data from input data stream with {@link Row}s into iceberg
+ * table. We use {@link RowData} inside the sink connector, so users need to provide a {@link TableSchema} for
+ * builder to convert those {@link Row}s to a {@link RowData} DataStream.
+ *
+ * @param input the source input data stream with {@link Row}s.
+ * @param tableSchema defines the {@link TypeInformation} for input data.
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public static FlinkSink.Builder forRow(DataStream<Row> input, TableSchema tableSchema) {
+ RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
+ DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
+
+ DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
+ return builderFor(input, rowConverter::toInternal, FlinkCompatibilityUtil.toTypeInfo(rowType))
+ .tableSchema(tableSchema);
+ }
+
+ /**
+ * Initialize a {@link FlinkSink.Builder} to export the data from input data stream with {@link RowData}s into
+ * iceberg table.
+ *
+ * @param input the source input data stream with {@link RowData}s.
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public static FlinkSink.Builder forRowData(DataStream<RowData> input) {
+ return new FlinkSink.Builder().forRowData(input);
+ }
+
+ public static class Builder {
+ private Function<String, DataStream<RowData>> inputCreator = null;
+ private TableLoader tableLoader;
+ private Table table;
+ private TableSchema tableSchema;
+ private SyncRewriteDataFilesActionOption compact;
+ private boolean overwrite = false;
+ private boolean appendMode = false;
+ private DistributionMode distributionMode = null;
+ private Integer writeParallelism = null;
+ private boolean upsert = false;
+ private List<String> equalityFieldColumns = null;
+ private String uidPrefix = null;
+ private MetricOption metricOption = null;
+
+ private Builder() {
+ }
+
+ private FlinkSink.Builder forRowData(DataStream<RowData> newRowDataInput) {
+ this.inputCreator = ignored -> newRowDataInput;
+ return this;
+ }
+
+ private <T> FlinkSink.Builder forMapperOutputType(DataStream<T> input,
+ MapFunction<T, RowData> mapper,
+ TypeInformation<RowData> outputType) {
+ this.inputCreator = newUidPrefix -> {
+ if (newUidPrefix != null) {
+ return input.map(mapper, outputType)
+ .name(operatorName(newUidPrefix))
+ .uid(newUidPrefix + "-mapper");
+ } else {
+ return input.map(mapper, outputType);
+ }
+ };
+ return this;
+ }
+
+ /**
+ * This iceberg {@link Table} instance is used for initializing {@link IcebergStreamWriter} which will write all
+ * the records into {@link DataFile}s and emit them to downstream operator. Providing a table would avoid so
+ * many table loading from each separate task.
+ *
+ * @param newTable the loaded iceberg table instance.
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public FlinkSink.Builder table(Table newTable) {
+ this.table = newTable;
+ return this;
+ }
+
+ /**
+ * The table loader is used for loading tables in {@link IcebergFilesCommitter} lazily, we need this loader
+ * because {@link Table} is not serializable and could not just use the loaded table from Builder#table in the
+ * remote task manager.
+ *
+ * @param newTableLoader to load iceberg table inside tasks.
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public FlinkSink.Builder tableLoader(TableLoader newTableLoader) {
+ this.tableLoader = newTableLoader;
+ return this;
+ }
+
+ public FlinkSink.Builder tableSchema(TableSchema newTableSchema) {
+ this.tableSchema = newTableSchema;
+ return this;
+ }
+
+ public FlinkSink.Builder overwrite(boolean newOverwrite) {
+ this.overwrite = newOverwrite;
+ return this;
+ }
+
+ /**
+ * The appendMode properties is used to insert data without equality field columns.
+ *
+ * @param appendMode append mode properties.
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public FlinkSink.Builder appendMode(boolean appendMode) {
+ this.appendMode = appendMode;
+ return this;
+ }
+
+ /**
+ * The compact properties is used to compact small files produced by checkpoint.
+ *
+ * @param compact auto compact properties.
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public FlinkSink.Builder compact(SyncRewriteDataFilesActionOption compact) {
+ this.compact = compact;
+ return this;
+ }
+
+ /**
+ * Add metric output for iceberg writer
+ * @param metricOption
+ * @return
+ */
+ public Builder metric(MetricOption metricOption) {
+ this.metricOption = metricOption;
+ return this;
+ }
+
+ /**
+ * Configure the write {@link DistributionMode} that the flink sink will use. Currently, flink support
+ * {@link DistributionMode#NONE} and {@link DistributionMode#HASH}.
+ *
+ * @param mode to specify the write distribution mode.
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public FlinkSink.Builder distributionMode(DistributionMode mode) {
+ Preconditions.checkArgument(!DistributionMode.RANGE.equals(mode),
+ "Flink does not support 'range' write distribution mode now.");
+ this.distributionMode = mode;
+ return this;
+ }
+
+ /**
+ * Configuring the write parallel number for iceberg stream writer.
+ *
+ * @param newWriteParallelism the number of parallel iceberg stream writer.
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public FlinkSink.Builder writeParallelism(int newWriteParallelism) {
+ this.writeParallelism = newWriteParallelism;
+ return this;
+ }
+
+ /**
+ * All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will
+ * DELETE the old records and then INSERT the new records. In partitioned table, the partition fields should be
+ * a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the
+ * new row that located in partition-B.
+ *
+ * @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public FlinkSink.Builder upsert(boolean enabled) {
+ this.upsert = enabled;
+ return this;
+ }
+
+ /**
+ * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
+ *
+ * @param columns defines the iceberg table's key.
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public FlinkSink.Builder equalityFieldColumns(List<String> columns) {
+ this.equalityFieldColumns = columns;
+ return this;
+ }
+
+ /**
+ * Set the uid prefix for FlinkSink operators. Note that FlinkSink internally consists of multiple operators
+ * (like writer, committer, dummy sink etc.) Actually operator uid will be appended with a suffix like
+ * "uidPrefix-writer".
+ * <br><br>
+ * If provided, this prefix is also applied to operator names.
+ * <br><br>
+ * Flink auto generates operator uid if not set explicitly. It is a recommended
+ * <a href="https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/">
+ * best-practice to set uid for all operators</a> before deploying to production. Flink has an option to {@code
+ * pipeline.auto-generate-uid=false} to disable auto-generation and force explicit setting of all operator uid.
+ * <br><br>
+ * Be careful with setting this for an existing job, because now we are changing the operator uid from an
+ * auto-generated one to this new value. When deploying the change with a checkpoint, Flink won't be able to
+ * restore the previous Flink sink operator state (more specifically the committer operator state). You need to
+ * use {@code --allowNonRestoredState} to ignore the previous sink state. During restore Flink sink state is
+ * used to check iflast commit was actually successful or not. {@code --allowNonRestoredState} can lead to data
+ * loss if the Iceberg commit failed in the last completed checkpoint.
+ *
+ * @param newPrefix prefix for Flink sink operator uid and name
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public FlinkSink.Builder uidPrefix(String newPrefix) {
+ this.uidPrefix = newPrefix;
+ return this;
+ }
+
+ private <T> DataStreamSink<T> chainIcebergOperators() {
+ Preconditions.checkArgument(inputCreator != null,
+ "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
+ Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
+
+ DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);
+
+ if (table == null) {
+ tableLoader.open();
+ try (TableLoader loader = tableLoader) {
+ this.table = loader.loadTable();
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
+ }
+ }
+
+ // Convert the requested flink table schema to flink row type.
+ RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+
+ // Distribute the records from input data stream based on the write.distribution-mode.
+ DataStream<RowData> distributeStream = distributeDataStream(
+ rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
+
+ // Add parallel writers that append rows to files
+ SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);
+
+ // Add single-parallelism committer that commits files
+ // after successful checkpoint or end of input
+ SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
+
+ // Add dummy discard sink
+ return appendDummySink(committerStream);
+ }
+
+ /**
+ * Append the iceberg sink operators to write records to iceberg table.
+ *
+ * @return {@link DataStreamSink} for sink.
+ * @deprecated this will be removed in 0.14.0; use {@link #append()} because its returned {@link DataStreamSink}
+ * has a more correct data type.
+ */
+ @Deprecated
+ public DataStreamSink<RowData> build() {
+ return chainIcebergOperators();
+ }
+
+ /**
+ * Append the iceberg sink operators to write records to iceberg table.
+ *
+ * @return {@link DataStreamSink} for sink.
+ */
+ public DataStreamSink<Void> append() {
+ return chainIcebergOperators();
+ }
+
+ private String operatorName(String suffix) {
+ return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> DataStreamSink<T> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
+ DataStreamSink<T> resultStream = committerStream
+ .addSink(new DiscardingSink())
+ .name(operatorName(String.format("IcebergSink %s", this.table.name())))
+ .setParallelism(1);
+ if (uidPrefix != null) {
+ resultStream = resultStream.uid(uidPrefix + "-dummysink");
+ }
+ return resultStream;
+ }
+
+ private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
+ IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite, compact);
+ SingleOutputStreamOperator<Void> committerStream = writerStream
+ .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
+ .setParallelism(1)
+ .setMaxParallelism(1);
+ if (uidPrefix != null) {
+ committerStream = committerStream.uid(uidPrefix + "-committer");
+ }
+ return committerStream;
+ }
+
+ private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
+ // Find out the equality field id list based on the user-provided equality field column names.
+ List<Integer> equalityFieldIds = Lists.newArrayList();
+ if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
+ for (String column : equalityFieldColumns) {
+ org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
+ Preconditions.checkNotNull(field, "Missing required equality field column '%s' "
+ + "in table schema %s", column, table.schema());
+ equalityFieldIds.add(field.fieldId());
+ }
+ }
+
+ // Fallback to use upsert mode parsed from table properties if don't specify in job level.
+ // Only if not appendMode, upsert can be valid.
+ boolean upsertMode = (upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+ UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT)) && !appendMode;
+
+ // Validate the equality fields and partition fields if we enable the upsert mode.
+ if (upsertMode) {
+ Preconditions.checkState(!overwrite,
+ "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
+ Preconditions.checkState(!equalityFieldIds.isEmpty(),
+ "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+ if (!table.spec().isUnpartitioned()) {
+ for (PartitionField partitionField : table.spec().fields()) {
+ Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
+ "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
+ partitionField, equalityFieldColumns);
+ }
+ }
+ }
+
+ IcebergStreamWriter<RowData> streamWriter =
+ createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode, appendMode, metricOption);
+
+ int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
+ SingleOutputStreamOperator<WriteResult> writerStream = input
+ .transform(operatorName(ICEBERG_STREAM_WRITER_NAME),
+ TypeInformation.of(WriteResult.class),
+ streamWriter)
+ .setParallelism(parallelism);
+ if (uidPrefix != null) {
+ writerStream = writerStream.uid(uidPrefix + "-writer");
+ }
+ return writerStream;
+ }
+
+ private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
+ Map<String, String> properties,
+ PartitionSpec partitionSpec,
+ Schema iSchema,
+ RowType flinkRowType) {
+ DistributionMode writeMode;
+ if (distributionMode == null) {
+ // Fallback to use distribution mode parsed from table properties if don't specify in job level.
+ String modeName = PropertyUtil.propertyAsString(properties,
+ WRITE_DISTRIBUTION_MODE,
+ WRITE_DISTRIBUTION_MODE_NONE);
+
+ writeMode = DistributionMode.fromName(modeName);
+ } else {
+ writeMode = distributionMode;
+ }
+
+ switch (writeMode) {
+ case NONE:
+ return input;
+
+ case HASH:
+ if (partitionSpec.isUnpartitioned()) {
+ return input;
+ } else {
+ return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
+ }
+
+ case RANGE:
+ LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
+ WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
+ return input;
+
+ default:
+ throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
+ }
+ }
+ }
+
+ static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
+ if (requestedSchema != null) {
+ // Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing
+ // iceberg schema.
+ Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
+ TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
+
+ // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will be promoted
+ // to iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 'byte'),
+ // we will read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here we must
+ // use flink schema.
+ return (RowType) requestedSchema.toRowDataType().getLogicalType();
+ } else {
+ return FlinkSchemaUtil.convert(schema);
+ }
+ }
+
+ static IcebergStreamWriter<RowData> createStreamWriter(Table table,
+ RowType flinkRowType,
+ List<Integer> equalityFieldIds,
+ boolean upsert,
+ boolean appendMode,
+ MetricOption metricOption) {
+ Preconditions.checkArgument(table != null, "Iceberg table should't be null");
+ Map<String, String> props = table.properties();
+ long targetFileSize = getTargetFileSizeBytes(props);
+ FileFormat fileFormat = getFileFormat(props);
+
+ Table serializableTable = SerializableTable.copyOf(table);
+ TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
+ serializableTable, flinkRowType, targetFileSize,
+ fileFormat, equalityFieldIds, upsert, appendMode);
+
+ return new IcebergStreamWriter<>(table.name(), taskWriterFactory, metricOption);
+ }
+
+ private static FileFormat getFileFormat(Map<String, String> properties) {
+ String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+ return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+ }
+
+ private static long getTargetFileSizeBytes(Map<String, String> properties) {
+ return PropertyUtil.propertyAsLong(properties,
+ WRITE_TARGET_FILE_SIZE_BYTES,
+ WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergFilesCommitter.java
similarity index 81%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergFilesCommitter.java
index 8e5315cad..2ee5ccf7f 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -48,6 +48,10 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.iceberg.flink.CompactTableProperties;
+import org.apache.inlong.sort.iceberg.flink.actions.RewriteResult;
+import org.apache.inlong.sort.iceberg.flink.actions.SyncRewriteDataFilesAction;
+import org.apache.inlong.sort.iceberg.flink.actions.SyncRewriteDataFilesActionOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,8 +61,16 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Optional;
import java.util.SortedMap;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_ENABLED;
+import static org.apache.inlong.sort.iceberg.flink.CompactTableProperties.COMPACT_ENABLED_DEFAULT;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add small file compact action.
+ */
class IcebergFilesCommitter extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {
@@ -69,9 +81,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
private static final String FLINK_JOB_ID = "flink.job-id";
- // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing, so we
- // could correctly commit all the data files whose checkpoint id is greater than the max committed one to iceberg
- // table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
+ // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always increasing,
+ // so we could correctly commit all the data files whose checkpoint id is greater than the max committed one to
+ // iceberg table, for avoiding committing the same data files twice. This id will be attached to iceberg's meta when
// committing the iceberg transaction.
private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
@@ -110,9 +122,17 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor();
private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
- IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) {
+ // compact file action
+ private SyncRewriteDataFilesActionOption compactOption;
+ private transient SyncRewriteDataFilesAction compactAction;
+
+ IcebergFilesCommitter(
+ TableLoader tableLoader,
+ boolean replacePartitions,
+ SyncRewriteDataFilesActionOption compactOption) {
this.tableLoader = tableLoader;
this.replacePartitions = replacePartitions;
+ this.compactOption = compactOption;
}
@Override
@@ -124,14 +144,24 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
this.tableLoader.open();
this.table = tableLoader.loadTable();
- maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
+ // compact file
+ if (PropertyUtil.propertyAsBoolean(table.properties(), COMPACT_ENABLED, COMPACT_ENABLED_DEFAULT)) {
+ compactAction = new SyncRewriteDataFilesAction(compactOption);
+ CompactTableProperties.TABLE_AUTO_COMPACT_PROPERTIES.stream()
+ .forEach(k -> Optional.ofNullable(table.properties().get(k))
+ .ifPresent(v -> compactAction.option(k, v)));
+ }
+
+ maxContinuousEmptyCommits =
+ PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
Preconditions.checkArgument(maxContinuousEmptyCommits > 0,
MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive");
int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
int attemptId = getRuntimeContext().getAttemptNumber();
- this.manifestOutputFileFactory = FlinkManifestUtil
- .createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
+ String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
+ this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId,
+ subTaskId, attemptId);
this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
@@ -141,9 +171,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
"Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
- // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job even
- // if it's restored from a snapshot created by another different flink job, so it's safe to assign the max
- // committed checkpoint id from restored flink job to the current flink job.
+ // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new flink job
+ // even if it's restored from a snapshot created by another different flink job, so it's safe to assign
+ // the max committed checkpoint id from restored flink job to the current flink job.
this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
@@ -189,6 +219,11 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
if (checkpointId > maxCommittedCheckpointId) {
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
+ // every interval checkpoint do a small file compact
+ if (compactAction != null) {
+ RewriteResult result = compactAction.execute();
+ LOG.info("compact action result: {}", result);
+ }
}
}
@@ -234,8 +269,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
.add("checkpointId", checkpointId)
.add("manifestPath", manifest.path())
.toString();
- LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink "
- + "manifests: {}", details, e);
+ LOG.warn("The iceberg transaction has been committed, but we failed to clean the "
+ + "temporary flink manifests: {}", details, e);
}
}
}
@@ -244,21 +279,23 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
long checkpointId) {
// Partition overwrite does not support delete files.
int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
- Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files.");
+ Preconditions.checkState(deleteFilesNum == 0,
+ "Cannot overwrite partitions with delete files.");
// Commit the overwrite transaction.
ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
int numFiles = 0;
for (WriteResult result : pendingResults.values()) {
- Preconditions.checkState(result.referencedDataFiles().length == 0,
- "Should have no referenced data files.");
+ Preconditions.checkState(
+ result.referencedDataFiles().length == 0, "Should have no referenced data files.");
numFiles += result.dataFiles().length;
Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
}
- commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
+ commitOperation(dynamicOverwrite, numFiles, 0,
+ "dynamic partition overwrite", newFlinkJobId, checkpointId);
}
private void commitDeltaTxn(
@@ -271,8 +308,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
int numFiles = 0;
for (WriteResult result : pendingResults.values()) {
- Preconditions.checkState(result.referencedDataFiles().length == 0,
- "Should have no referenced data files.");
+ Preconditions.checkState(
+ result.referencedDataFiles().length == 0, "Should have no referenced data files.");
numFiles += result.dataFiles().length;
Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
@@ -288,11 +325,11 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
WriteResult result = e.getValue();
// Row delta validations are not needed for streaming changes that write equality deletes. Equality
- // deletes are applied to data in all previous sequence numbers, so retries may push deletes further in
- // the future, but do not affect correctness. Position deletes committed to the table in this path are
- // used only to delete rows from data files that are being added in this commit. There is no way for
- // data files added along with the delete files to be concurrently removed, so there is no need to
- // validate the files referenced by the position delete files that are being committed.
+ // deletes are applied to data in all previous sequence numbers, so retries may push deletes further
+ // in the future, but do not affect correctness. Position deletes committed to the table in this path
+ // are used only to delete rows from data files that are being added in this commit. There is no way
+ // for data files added along with the delete files to be concurrently removed, so there is no need
+ // to validate the files referenced by the position delete files that are being committed.
RowDelta rowDelta = table.newRowDelta();
int numDataFiles = result.dataFiles().length;
@@ -335,8 +372,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
}
/**
- * Write all the complete data files to a newly created manifest file and return the manifest's avro serialized
- * bytes.
+ * Write all the complete data files to a newly created manifest file and return the manifest's avro
+ * serialized bytes.
*/
private byte[] writeToManifest(long checkpointId) throws IOException {
if (writeResultsOfCurrentCkpt.isEmpty()) {
@@ -351,7 +388,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
}
@Override
- public void dispose() throws Exception {
+ public void close() throws Exception {
if (tableLoader != null) {
tableLoader.close();
}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
similarity index 82%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
index 75eca46c5..9f7cffbcf 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
@@ -28,14 +28,16 @@ import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
import javax.annotation.Nullable;
import java.io.IOException;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
-
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {
@@ -43,8 +45,7 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
private final String fullTableName;
private final TaskWriterFactory<T> taskWriterFactory;
- private final String inlongMetric;
- private final String auditHostAndPorts;
+ private final MetricOption metricOption;
private transient TaskWriter<T> writer;
private transient int subTaskId;
@@ -52,15 +53,12 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
@Nullable
private transient SinkMetricData metricData;
- IcebergStreamWriter(
- String fullTableName,
+ IcebergStreamWriter(String fullTableName,
TaskWriterFactory<T> taskWriterFactory,
- String inlongMetric,
- String auditHostAndPorts) {
+ @Nullable MetricOption metricOption) {
this.fullTableName = fullTableName;
this.taskWriterFactory = taskWriterFactory;
- this.inlongMetric = inlongMetric;
- this.auditHostAndPorts = auditHostAndPorts;
+ this.metricOption = metricOption;
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@@ -76,13 +74,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
this.writer = taskWriterFactory.create();
// Initialize metric
- if (inlongMetric != null) {
- String[] inlongMetricArray = inlongMetric.split(DELIMITER);
- String inlongGroupId = inlongMetricArray[0];
- String inlongStreamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- metricData = new SinkMetricData(
- inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
+ if (metricOption != null) {
+ metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
@@ -110,8 +103,8 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
}
@Override
- public void dispose() throws Exception {
- super.dispose();
+ public void close() throws Exception {
+ super.close();
if (writer != null) {
writer.close();
writer = null;
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/ManifestOutputFileFactory.java
similarity index 84%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/ManifestOutputFileFactory.java
index f13ff4237..e01fd4d0d 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/ManifestOutputFileFactory.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.TableOperations;
@@ -28,6 +28,9 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
class ManifestOutputFileFactory {
// Users could define their own flink manifests directory by setting this value in table properties.
static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location";
@@ -36,23 +39,25 @@ class ManifestOutputFileFactory {
private final FileIO io;
private final Map<String, String> props;
private final String flinkJobId;
+ private final String operatorUniqueId;
private final int subTaskId;
private final long attemptNumber;
private final AtomicInteger fileCount = new AtomicInteger(0);
ManifestOutputFileFactory(TableOperations ops, FileIO io, Map<String, String> props,
- String flinkJobId, int subTaskId, long attemptNumber) {
+ String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) {
this.ops = ops;
this.io = io;
this.props = props;
this.flinkJobId = flinkJobId;
+ this.operatorUniqueId = operatorUniqueId;
this.subTaskId = subTaskId;
this.attemptNumber = attemptNumber;
}
private String generatePath(long checkpointId) {
- return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId,
- attemptNumber, checkpointId, fileCount.incrementAndGet()));
+ return FileFormat.AVRO.addExtension(String.format("%s-%s-%05d-%d-%d-%05d", flinkJobId, operatorUniqueId,
+ subTaskId, attemptNumber, checkpointId, fileCount.incrementAndGet()));
}
OutputFile create(long checkpointId) {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionKeySelector.java
similarity index 95%
copy from inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
copy to inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionKeySelector.java
index 1c2cf61b1..bc4286bbb 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionKeySelector.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.inlong.sort.iceberg.sink;
+package org.apache.inlong.sort.iceberg.flink.sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.table.data.RowData;
@@ -31,6 +31,8 @@ import org.apache.iceberg.flink.sink.FlinkSink;
/**
* Create a {@link KeySelector} to shuffle by partition key, then each partition/bucket will be wrote by only one
* task. That will reduce lots of small files in partitioned fanout write policy for {@link FlinkSink}.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
*/
class PartitionKeySelector implements KeySelector<RowData, String> {
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionedDeltaWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionedDeltaWriter.java
new file mode 100644
index 000000000..a482e0ba2
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionedDeltaWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
+
+ private final PartitionKey partitionKey;
+
+ private final Map<PartitionKey, RowDataDeltaWriter> writers = Maps.newHashMap();
+
+ PartitionedDeltaWriter(PartitionSpec spec,
+ FileFormat format,
+ FileAppenderFactory<RowData> appenderFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSize,
+ Schema schema,
+ RowType flinkSchema,
+ List<Integer> equalityFieldIds,
+ boolean upsert) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+ upsert);
+ this.partitionKey = new PartitionKey(spec, schema);
+ }
+
+ @Override
+ RowDataDeltaWriter route(RowData row) {
+ partitionKey.partition(wrapper().wrap(row));
+
+ RowDataDeltaWriter writer = writers.get(partitionKey);
+ if (writer == null) {
+ // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+ PartitionKey copiedKey = partitionKey.copy();
+ writer = new RowDataDeltaWriter(copiedKey);
+ writers.put(copiedKey, writer);
+ }
+
+ return writer;
+ }
+
+ @Override
+ public void close() {
+ try {
+ Tasks.foreach(writers.values())
+ .throwFailureWhenFinished()
+ .noRetry()
+ .run(RowDataDeltaWriter::close, IOException.class);
+
+ writers.clear();
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close equality delta writer", e);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
new file mode 100644
index 000000000..36240760f
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitionedFanoutWriter;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ArrayUtil;
+
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add an option `sink.ignore.changelog` to support insert-only mode without equalityFieldIds.
+ */
+public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
+ private final Table table;
+ private final Schema schema;
+ private final RowType flinkSchema;
+ private final PartitionSpec spec;
+ private final FileIO io;
+ private final long targetFileSizeBytes;
+ private final FileFormat format;
+ private final List<Integer> equalityFieldIds;
+ private final boolean upsert;
+ private final boolean appendMode;
+ private final FileAppenderFactory<RowData> appenderFactory;
+
+ private transient OutputFileFactory outputFileFactory;
+
+ public RowDataTaskWriterFactory(Table table,
+ RowType flinkSchema,
+ long targetFileSizeBytes,
+ FileFormat format,
+ List<Integer> equalityFieldIds,
+ boolean upsert,
+ boolean appendMode) {
+ this.table = table;
+ this.schema = table.schema();
+ this.flinkSchema = flinkSchema;
+ this.spec = table.spec();
+ this.io = table.io();
+ this.targetFileSizeBytes = targetFileSizeBytes;
+ this.format = format;
+ this.equalityFieldIds = equalityFieldIds;
+ this.upsert = upsert;
+ this.appendMode = appendMode;
+
+ if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
+ this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
+ } else {
+ // TODO provide the ability to customize the equality-delete row schema.
+ this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
+ ArrayUtil.toIntArray(equalityFieldIds), schema, null);
+ }
+ }
+
+ @Override
+ public void initialize(int taskId, int attemptId) {
+ this.outputFileFactory = OutputFileFactory.builderFor(table, taskId, attemptId).build();
+ }
+
+ @Override
+ public TaskWriter<RowData> create() {
+ Preconditions.checkNotNull(outputFileFactory,
+ "The outputFileFactory shouldn't be null if we have invoked the initialize().");
+
+ if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
+ // Initialize a task writer to write INSERT only.
+ if (spec.isUnpartitioned()) {
+ return new UnpartitionedWriter<>(
+ spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes);
+ } else {
+ return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
+ io, targetFileSizeBytes, schema, flinkSchema);
+ }
+ } else {
+ // Initialize a task writer to write both INSERT and equality DELETE.
+ if (spec.isUnpartitioned()) {
+ return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
+ targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+ } else {
+ return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
+ targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+ }
+ }
+ }
+
+ private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter<RowData> {
+
+ private final PartitionKey partitionKey;
+ private final RowDataWrapper rowDataWrapper;
+
+ RowDataPartitionedFanoutWriter(
+ PartitionSpec spec, FileFormat format, FileAppenderFactory<RowData> appenderFactory,
+ OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema,
+ RowType flinkSchema) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+ this.partitionKey = new PartitionKey(spec, schema);
+ this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+ }
+
+ @Override
+ protected PartitionKey partition(RowData row) {
+ partitionKey.partition(rowDataWrapper.wrap(row));
+ return partitionKey;
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/UnpartitionedDeltaWriter.java
new file mode 100644
index 000000000..9e8e53999
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/UnpartitionedDeltaWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.flink.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
+ private final RowDataDeltaWriter writer;
+
+ UnpartitionedDeltaWriter(PartitionSpec spec,
+ FileFormat format,
+ FileAppenderFactory<RowData> appenderFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSize,
+ Schema schema,
+ RowType flinkSchema,
+ List<Integer> equalityFieldIds,
+ boolean upsert) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+ upsert);
+ this.writer = new RowDataDeltaWriter(null);
+ }
+
+ @Override
+ RowDataDeltaWriter route(RowData row) {
+ return writer;
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/iceberg-dlc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..db2caa084
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.iceberg.flink.FlinkDynamicTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
index 2056a8d02..e81a5552d 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalog.java
@@ -89,6 +89,8 @@ import java.util.stream.Collectors;
* </p>
* The Iceberg table manages its partitions by itself. The partition of the Iceberg table is independent of the
* partition of Flink.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
*/
public class FlinkCatalog extends AbstractCatalog {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
index f4bb7d49f..6a02dc81e 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
@@ -57,6 +57,8 @@ import java.util.Map;
* </p>
* To use a custom catalog that is not a Hive or Hadoop catalog, extend this class and override
* {@link #createCatalogLoader(String, Map, Configuration)}.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
*/
public class FlinkCatalogFactory implements CatalogFactory {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index e10626148..f12bd1b56 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -52,7 +52,7 @@ import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
/**
- * Copy from org.apache.iceberg.flink:iceberg-flink-runtime-1.13:0.13.1
+ * Copy from org.apache.iceberg.flink:iceberg-flink-runtime-1.13:0.13.2
*
* <p>
* Factory for creating configured instances of {@link IcebergTableSource} and {@link
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index b5f3bef9e..088622278 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -37,11 +37,17 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit
+ */
public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
private static final Logger LOG = LoggerFactory.getLogger(IcebergTableSink.class);
@@ -80,6 +86,9 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
+ .appendMode(Boolean.valueOf(
+ Optional.ofNullable(catalogTable.getOptions().get(ICEBERG_IGNORE_ALL_CHANGELOG.key()))
+ .orElse(ICEBERG_IGNORE_ALL_CHANGELOG.defaultValue().toString())))
.metric(catalogTable.getOptions().getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue()),
catalogTable.getOptions().getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue()))
.append();
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/BaseDeltaTaskWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/BaseDeltaTaskWriter.java
new file mode 100644
index 000000000..40b174cd8
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/BaseDeltaTaskWriter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.data.RowDataProjection;
+import org.apache.iceberg.io.BaseTaskWriter;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
+
+ private final Schema schema;
+ private final Schema deleteSchema;
+ private final RowDataWrapper wrapper;
+ private final RowDataWrapper keyWrapper;
+ private final RowDataProjection keyProjection;
+ private final boolean upsert;
+
+ BaseDeltaTaskWriter(PartitionSpec spec,
+ FileFormat format,
+ FileAppenderFactory<RowData> appenderFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSize,
+ Schema schema,
+ RowType flinkSchema,
+ List<Integer> equalityFieldIds,
+ boolean upsert) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+ this.schema = schema;
+ this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
+ this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+ this.upsert = upsert;
+ this.keyWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
+ this.keyProjection = RowDataProjection.create(schema, deleteSchema);
+ }
+
+ abstract RowDataDeltaWriter route(RowData row);
+
+ RowDataWrapper wrapper() {
+ return wrapper;
+ }
+
+ @Override
+ public void write(RowData row) throws IOException {
+ RowDataDeltaWriter writer = route(row);
+
+ switch (row.getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ if (upsert) {
+ writer.deleteKey(keyProjection.wrap(row));
+ }
+ writer.write(row);
+ break;
+
+ case UPDATE_BEFORE:
+ if (upsert) {
+ break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
+ }
+ writer.delete(row);
+ break;
+ case DELETE:
+ writer.delete(row);
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Unknown row kind: " + row.getRowKind());
+ }
+ }
+
+ protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
+ RowDataDeltaWriter(PartitionKey partition) {
+ super(partition, schema, deleteSchema);
+ }
+
+ @Override
+ protected StructLike asStructLike(RowData data) {
+ return wrapper.wrap(data);
+ }
+
+ @Override
+ protected StructLike asStructLikeKey(RowData data) {
+ return keyWrapper.wrap(data);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
index 6b52dff8b..dae992042 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
@@ -25,6 +25,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import java.util.List;
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
class DeltaManifests {
private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0];
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
index fb2187d76..1ae2bcbe5 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
@@ -30,6 +30,9 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
private static final int VERSION_1 = 1;
private static final int VERSION_2 = 2;
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
index 1cbe0d70b..46d7beec9 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
@@ -38,6 +38,9 @@ import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
class FlinkManifestUtil {
private static final int FORMAT_V2 = 2;
private static final Long DUMMY_SNAPSHOT_ID = 0L;
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index d86857338..65929bc34 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -42,7 +42,6 @@ import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
@@ -69,6 +68,11 @@ import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add an option `sink.ignore.changelog` to support insert-only mode without primaryKey.
+ * Add option `inlong.metric` and `inlong.audit` to support collect inlong metrics and audit.
+ */
public class FlinkSink {
private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
@@ -129,6 +133,7 @@ public class FlinkSink {
private Table table;
private TableSchema tableSchema;
private boolean overwrite = false;
+ private boolean appendMode = false;
private DistributionMode distributionMode = null;
private Integer writeParallelism = null;
private boolean upsert = false;
@@ -196,6 +201,17 @@ public class FlinkSink {
return this;
}
+ /**
+ * The appendMode properties is used to insert data without equality field columns.
+ *
+ * @param appendMode append mode properties.
+ * @return {@link FlinkSink.Builder} to connect the iceberg table.
+ */
+ public FlinkSink.Builder appendMode(boolean appendMode) {
+ this.appendMode = appendMode;
+ return this;
+ }
+
/**
* Add metric output for iceberg writer
* @param inlongMetric
@@ -381,8 +397,9 @@ public class FlinkSink {
}
// Fallback to use upsert mode parsed from table properties if don't specify in job level.
- boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
- UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
+ // Only if not appendMode, upsert can be valid.
+ boolean upsertMode = (upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+ UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT)) && !appendMode;
// Validate the equality fields and partition fields if we enable the upsert mode.
if (upsertMode) {
@@ -400,7 +417,7 @@ public class FlinkSink {
}
IcebergStreamWriter<RowData> streamWriter = createStreamWriter(
- table, flinkRowType, equalityFieldIds, upsertMode, inlongMetric, auditHostAndPorts);
+ table, flinkRowType, equalityFieldIds, upsertMode, appendMode, inlongMetric, auditHostAndPorts);
int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -474,6 +491,7 @@ public class FlinkSink {
RowType flinkRowType,
List<Integer> equalityFieldIds,
boolean upsert,
+ boolean appendMode,
String inlongMetric,
String auditHostAndPorts) {
Preconditions.checkArgument(table != null, "Iceberg table should't be null");
@@ -484,7 +502,7 @@ public class FlinkSink {
Table serializableTable = SerializableTable.copyOf(table);
TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
serializableTable, flinkRowType, targetFileSize,
- fileFormat, equalityFieldIds, upsert);
+ fileFormat, equalityFieldIds, upsert, appendMode);
return new IcebergStreamWriter<>(table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts);
}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
index 8e5315cad..7942ef77c 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
@@ -59,6 +59,9 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
class IcebergFilesCommitter extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 75eca46c5..79df9049f 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -36,6 +36,9 @@ import java.io.IOException;
import static org.apache.inlong.sort.base.Constants.DELIMITER;
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
index f13ff4237..97f4f3f02 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
@@ -28,6 +28,9 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
class ManifestOutputFileFactory {
// Users could define their own flink manifests directory by setting this value in table properties.
static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location";
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
index 1c2cf61b1..cd0442b9a 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
@@ -31,6 +31,8 @@ import org.apache.iceberg.flink.sink.FlinkSink;
/**
* Create a {@link KeySelector} to shuffle by partition key, then each partition/bucket will be wrote by only one
* task. That will reduce lots of small files in partitioned fanout write policy for {@link FlinkSink}.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
*/
class PartitionKeySelector implements KeySelector<RowData, String> {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionedDeltaWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionedDeltaWriter.java
new file mode 100644
index 000000000..230e40f30
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionedDeltaWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
+
+ private final PartitionKey partitionKey;
+
+ private final Map<PartitionKey, RowDataDeltaWriter> writers = Maps.newHashMap();
+
+ PartitionedDeltaWriter(PartitionSpec spec,
+ FileFormat format,
+ FileAppenderFactory<RowData> appenderFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSize,
+ Schema schema,
+ RowType flinkSchema,
+ List<Integer> equalityFieldIds,
+ boolean upsert) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+ upsert);
+ this.partitionKey = new PartitionKey(spec, schema);
+ }
+
+ @Override
+ RowDataDeltaWriter route(RowData row) {
+ partitionKey.partition(wrapper().wrap(row));
+
+ RowDataDeltaWriter writer = writers.get(partitionKey);
+ if (writer == null) {
+ // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers.
+ PartitionKey copiedKey = partitionKey.copy();
+ writer = new RowDataDeltaWriter(copiedKey);
+ writers.put(copiedKey, writer);
+ }
+
+ return writer;
+ }
+
+ @Override
+ public void close() {
+ try {
+ Tasks.foreach(writers.values())
+ .throwFailureWhenFinished()
+ .noRetry()
+ .run(RowDataDeltaWriter::close, IOException.class);
+
+ writers.clear();
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close equality delta writer", e);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
new file mode 100644
index 000000000..831ef6a4a
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitionedFanoutWriter;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ArrayUtil;
+
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ * Add an option `sink.ignore.changelog` to support insert-only mode without equalityFieldIds.
+ */
+public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
+ private final Table table;
+ private final Schema schema;
+ private final RowType flinkSchema;
+ private final PartitionSpec spec;
+ private final FileIO io;
+ private final long targetFileSizeBytes;
+ private final FileFormat format;
+ private final List<Integer> equalityFieldIds;
+ private final boolean upsert;
+ private final boolean appendMode;
+ private final FileAppenderFactory<RowData> appenderFactory;
+
+ private transient OutputFileFactory outputFileFactory;
+
+ public RowDataTaskWriterFactory(Table table,
+ RowType flinkSchema,
+ long targetFileSizeBytes,
+ FileFormat format,
+ List<Integer> equalityFieldIds,
+ boolean upsert,
+ boolean appendMode) {
+ this.table = table;
+ this.schema = table.schema();
+ this.flinkSchema = flinkSchema;
+ this.spec = table.spec();
+ this.io = table.io();
+ this.targetFileSizeBytes = targetFileSizeBytes;
+ this.format = format;
+ this.equalityFieldIds = equalityFieldIds;
+ this.upsert = upsert;
+ this.appendMode = appendMode;
+
+ if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
+ this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
+ } else {
+ // TODO provide the ability to customize the equality-delete row schema.
+ this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
+ ArrayUtil.toIntArray(equalityFieldIds), schema, null);
+ }
+ }
+
+ @Override
+ public void initialize(int taskId, int attemptId) {
+ this.outputFileFactory = OutputFileFactory.builderFor(table, taskId, attemptId).build();
+ }
+
+ @Override
+ public TaskWriter<RowData> create() {
+ Preconditions.checkNotNull(outputFileFactory,
+ "The outputFileFactory shouldn't be null if we have invoked the initialize().");
+
+ if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) {
+ // Initialize a task writer to write INSERT only.
+ if (spec.isUnpartitioned()) {
+ return new UnpartitionedWriter<>(
+ spec, format, appenderFactory, outputFileFactory, io, targetFileSizeBytes);
+ } else {
+ return new RowDataPartitionedFanoutWriter(spec, format, appenderFactory, outputFileFactory,
+ io, targetFileSizeBytes, schema, flinkSchema);
+ }
+ } else {
+ // Initialize a task writer to write both INSERT and equality DELETE.
+ if (spec.isUnpartitioned()) {
+ return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
+ targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+ } else {
+ return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
+ targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
+ }
+ }
+ }
+
+ private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter<RowData> {
+
+ private final PartitionKey partitionKey;
+ private final RowDataWrapper rowDataWrapper;
+
+ RowDataPartitionedFanoutWriter(
+ PartitionSpec spec, FileFormat format, FileAppenderFactory<RowData> appenderFactory,
+ OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema,
+ RowType flinkSchema) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+ this.partitionKey = new PartitionKey(spec, schema);
+ this.rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+ }
+
+ @Override
+ protected PartitionKey partition(RowData row) {
+ partitionKey.partition(rowDataWrapper.wrap(row));
+ return partitionKey;
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/UnpartitionedDeltaWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/UnpartitionedDeltaWriter.java
new file mode 100644
index 000000000..c88724880
--- /dev/null
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/UnpartitionedDeltaWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
+ */
+class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
+ private final RowDataDeltaWriter writer;
+
+ UnpartitionedDeltaWriter(PartitionSpec spec,
+ FileFormat format,
+ FileAppenderFactory<RowData> appenderFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSize,
+ Schema schema,
+ RowType flinkSchema,
+ List<Integer> equalityFieldIds,
+ boolean upsert) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+ upsert);
+ this.writer = new RowDataDeltaWriter(null);
+ }
+
+ @Override
+ RowDataDeltaWriter route(RowData row) {
+ return writer;
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+}
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index aabd3e7ce..4dc99082b 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -96,13 +96,13 @@
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
- <artifactId>sort-connector-hive</artifactId>
+ <artifactId>sort-connector-hbase</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
- <artifactId>sort-connector-hbase</artifactId>
+ <artifactId>sort-connector-hive</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
@@ -112,6 +112,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-iceberg-dlc</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-elasticsearch6</artifactId>
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java
index 5857fc70c..739c507ce 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DLCIcebergSqlParseTest.java
@@ -60,8 +60,8 @@ public class DLCIcebergSqlParseTest {
null,
map,
"id",
- Collections.singletonList("test"),
- "localhost",
+ Collections.singletonList("mysql_input"),
+ "127.0.0.1",
"root",
"123456",
"inlong",
@@ -74,13 +74,13 @@ public class DLCIcebergSqlParseTest {
private DLCIcebergLoadNode buildDLCLoadNode() {
// set HIVE_CONF_DIR,or set uri and warehouse
Map<String, String> properties = new HashMap<>();
+
properties.put(DLCConstant.DLC_REGION, "ap-beijing");
properties.put(DLCConstant.DLC_SECRET_ID, "XXXXXXXXXXX");
properties.put(DLCConstant.DLC_SECRET_KEY, "XXXXXXXXXXX");
+ properties.put(DLCConstant.DLC_USER_APPID, "XXXXXXXXXXX");
+ properties.put(DLCConstant.DLC_MANAGED_ACCOUNT_UID, "XXXXXXXXXXX");
- properties.put(DLCConstant.FS_COS_REGION, "ap-beijing");
- properties.put(DLCConstant.FS_COS_SECRET_ID, "XXXXXXXXXXX");
- properties.put(DLCConstant.FS_COS_SECRET_KEY, "XXXXXXXXXXX");
List<FieldRelation> relations = Arrays
.asList(new FieldRelation(new FieldInfo("id", new IntFormatInfo()),
new FieldInfo("id", new IntFormatInfo())),
diff --git a/licenses/inlong-manager/LICENSE b/licenses/inlong-manager/LICENSE
index 41449fc11..73687e2ac 100644
--- a/licenses/inlong-manager/LICENSE
+++ b/licenses/inlong-manager/LICENSE
@@ -623,11 +623,11 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
org.apache.hadoop:hadoop-yarn-api:2.10.1 - Apache Hadoop YARN API (https://github.com/apache/hadoop/tree/branch-2.10.1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api), (Apache License, Version 2.0)
org.apache.hadoop:hadoop-yarn-client:2.10.1 - Apache Hadoop YARN Client (https://github.com/apache/hadoop/tree/branch-2.10.1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client), (Apache License, Version 2.0)
com.carrotsearch:hppc:0.7.2 - HPPC Collections (https://github.com/carrotsearch/hppc/tree/0.7.2), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-api:0.13.1 - Apace Iceberg API (https://https://iceberg.apache.org/), (Apache License, Version 2.0)
- org.apache.iceberg:iceberg-bundled-guava:0.13.1 - Apache Iceberg Bundled-guava (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-common:0.13.1 - Apache Iceberg Common (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-core:0.13.1 - Apace Iceberg Core (https://https://iceberg.apache.org/), (Apache License, Version 2.0)
- org.apache.iceberg:iceberg-hive-metastore:0.13.1 - Apace Iceberg Hive Metastore (https://https://iceberg.apache.org/), (Apache License, Version 2.0)
+ org.apache.iceberg:iceberg-api:0.13.2 - Apace Iceberg API (https://https://iceberg.apache.org/), (Apache License, Version 2.0)
+ org.apache.iceberg:iceberg-bundled-guava:0.13.2 - Apache Iceberg Bundled-guava (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-common:0.13.2 - Apache Iceberg Common (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-core:0.13.2 - Apace Iceberg Core (https://https://iceberg.apache.org/), (Apache License, Version 2.0)
+ org.apache.iceberg:iceberg-hive-metastore:0.13.2 - Apace Iceberg Hive Metastore (https://https://iceberg.apache.org/), (Apache License, Version 2.0)
org.codehaus.jackson:jackson-core-asl:1.9.13 - Jackson (https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-core-asl/1.9.13), (The Apache Software License, Version 2.0)
com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.2 - Jackson-dataformat-YAML (https://github.com/FasterXML/jackson-dataformat-yaml/tree/jackson-dataformat-yaml-2.13.2), (The Apache Software License, Version 2.0)
com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.2 - Jackson datatype: JSR310 (https://github.com/FasterXML/jackson-modules-java8/tree/jackson-modules-java8-2.13.2), (The Apache Software License, Version 2.0)
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 7c823b0af..980c471b7 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -503,7 +503,35 @@
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkConfigOptions.java
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
- Source : iceberg-flink-runtime-1.13 1.13.5 (Please note that the software have been modified.)
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/BaseDeltaTaskWriter.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionedDeltaWriter.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
+ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/UnpartitionedDeltaWriter.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalog.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkCatalogFactory.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/FlinkDynamicTableFactory.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/IcebergTableSink.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/BaseDeltaTaskWriter.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifests.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/DeltaManifestsSerializer.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkManifestUtil.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/FlinkSink.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergFilesCommitter.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/ManifestOutputFileFactory.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionedDeltaWriter.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/PartitionKeySelector.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
+ inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/UnpartitionedDeltaWriter.java
+ Source : iceberg-flink:iceberg-flink-1.13:0.13.2 (Please note that the software have been modified.)
License : https://github.com/apache/iceberg/LICENSE
1.3.7 inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/AbstractJdbcDialect.java
@@ -542,6 +570,7 @@
Source : flink-table-runtime-blink_2.11-13.2-rc2 2.2.1 (Please note that the software have been modified.)
License : https://github.com/apache/flink/blob/release-1.13.2-rc2/LICENSE
+
=======================================================================
Apache InLong Subcomponents:
@@ -771,7 +800,7 @@ The text of each license is the standard Apache 2.0 license.
org.ini4j:ini4j:0.5.1 - ini4j (https://sourceforge.net/projects/ini4j), (The Apache Software License, Version 2.0)
com.squareup.okhttp:logging-interceptor:2.7.5 - logging-interceptor (https://github.com/square/okhttp/tree/master/okhttp-logging-interceptor), (The Apache Software License, Version 2.0)
com.tencentcloudapi:tencentcloud-sdk-java:3.1.545 - tencentcloud-sdk-java (https://github.com/TencentCloud/tencentcloud-sdk-java), (The Apache Software License, Version 2.0)
- com.qcloud:dlc-data-catalog-metastore-client:1.1 - dlc-data-catalog-metastore-client (https://mvnrepository.com/artifact/com.qcloud/dlc-data-catalog-metastore-client/1.1), (The Apache Software License, Version 2.0)
+ com.qcloud:dlc-data-catalog-metastore-client:1.1.1 - dlc-data-catalog-metastore-client (https://mvnrepository.com/artifact/com.qcloud/dlc-data-catalog-metastore-client/1.1), (The Apache Software License, Version 2.0)
org.apache.doris:flink-doris-connector-1.13_2.11:1.0.3 - Flink Connector for Apache Doris (https://github.com/apache/doris-flink-connector/tree/1.13_2.11-1.0.3), (The Apache Software License, Version 2.0)
@@ -804,16 +833,16 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
com.carrotsearch:hppc:0.7.1 - HPPC Collections (https://github.com/carrotsearch/hppc/tree/0.7.1), (The Apache Software License, Version 2.0)
com.carrotsearch:hppc:0.7.2 - HPPC Collections (https://github.com/carrotsearch/hppc/tree/0.7.2), (The Apache Software License, Version 2.0)
com.carrotsearch:hppc:0.8.1 - HPPC Collections (https://github.com/carrotsearch/hppc/tree/0.8.1), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-api:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-bundled-guava:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-common:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-core:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-data:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-flink-runtime-1.13:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-hive-metastore:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-orc:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-parquet:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
- org.apache.iceberg:iceberg-flink-runtime-1.13:jar:0.13.1 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-api:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-bundled-guava:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-common:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-core:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-data:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-flink-runtime-1.13:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-hive-metastore:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-orc:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-parquet:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
+ org.apache.iceberg:iceberg-flink-runtime-1.13:jar:0.13.2 - Apache Iceberg (https://iceberg.apache.org), (The Apache Software License, Version 2.0)
org.codehaus.jackson:jackson-core-asl:1.9.13 - Jackson (https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-core-asl/1.9.13), (The Apache Software License, Version 2.0)
com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.2 - Jackson-dataformat-YAML (https://github.com/FasterXML/jackson-dataformat-yaml/tree/jackson-dataformat-yaml-2.13.2), (The Apache Software License, Version 2.0)
com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.2 - Jackson datatype: JSR310 (https://github.com/FasterXML/jackson-modules-java8/tree/jackson-modules-java8-2.13.2), (The Apache Software License, Version 2.0)
diff --git a/pom.xml b/pom.xml
index 35c0423f0..a96c42a46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -201,8 +201,7 @@
<pulsar.version>2.8.1</pulsar.version>
<pulsar.testcontainers.version>1.15.3</pulsar.testcontainers.version>
<kafka.version>2.4.1</kafka.version>
- <iceberg.flink.version>0.13.1</iceberg.flink.version>
- <iceberg.version>0.13.1</iceberg.version>
+ <iceberg.version>0.13.2</iceberg.version>
<flink.version>1.13.5</flink.version>
<flink.minor.version>1.13</flink.minor.version>
<flink.connector.mysql.cdc.version>2.2.1</flink.connector.mysql.cdc.version>
@@ -249,7 +248,9 @@
<tencentcloud.api.version>3.1.545</tencentcloud.api.version>
<cos.hadoop.version>2.7.5-5.9.3</cos.hadoop.version>
<cos.bundle.version>5.6.35</cos.bundle.version>
- <dlc.client.version>1.1</dlc.client.version>
+ <dlc.client.version>1.1.1</dlc.client.version>
+ <dlc.jdbc.version>2.2.6</dlc.jdbc.version>
+ <cos.lakefs.plugin.version>1.0</cos.lakefs.plugin.version>
<esri-geometry-api.version>2.0.0</esri-geometry-api.version>
<HikariCP.version>4.0.3</HikariCP.version>
<caffeine.version>2.9.3</caffeine.version>
@@ -1033,7 +1034,7 @@
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.13</artifactId>
- <version>${iceberg.flink.version}</version>
+ <version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -1570,6 +1571,16 @@
<artifactId>dlc-data-catalog-metastore-client</artifactId>
<version>${dlc.client.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.qcloud.cos</groupId>
+ <artifactId>lakefs-cloud-plugin</artifactId>
+ <version>${cos.lakefs.plugin.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.qcloud</groupId>
+ <artifactId>dlc-jdbc</artifactId>
+ <version>${dlc.jdbc.version}</version>
+ </dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>