You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2023/04/12 15:15:42 UTC
[iceberg-docs] branch 1.2.1 updated: 1.2.1 Docs Update
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch 1.2.1
in repository https://gitbox.apache.org/repos/asf/iceberg-docs.git
The following commit(s) were added to refs/heads/1.2.1 by this push:
new 253d32e7 1.2.1 Docs Update
253d32e7 is described below
commit 253d32e7506868583f5f34d32df59de55d8d14e2
Author: Daniel Weeks <dw...@apache.org>
AuthorDate: Wed Apr 12 08:13:27 2023 -0700
1.2.1 Docs Update
---
docs/content/aws.md | 13 +-
docs/content/flink-actions.md | 42 ++
docs/content/flink-configuration.md | 161 +++++++
docs/content/flink-ddl.md | 213 +++++++++
docs/content/flink-getting-started.md | 851 ++--------------------------------
docs/content/flink-queries.md | 450 ++++++++++++++++++
docs/content/flink-writes.md | 262 +++++++++++
docs/content/spark-configuration.md | 2 +-
docs/content/spark-procedures.md | 116 +++++
9 files changed, 1300 insertions(+), 810 deletions(-)
diff --git a/docs/content/aws.md b/docs/content/aws.md
index 81323eeb..2f8e1911 100644
--- a/docs/content/aws.md
+++ b/docs/content/aws.md
@@ -60,7 +60,6 @@ AWS_SDK_VERSION=2.20.18
AWS_MAVEN_GROUP=software.amazon.awssdk
AWS_PACKAGES=(
"bundle"
- "url-connection-client"
)
for pkg in "${AWS_PACKAGES[@]}"; do
DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION"
@@ -93,7 +92,6 @@ AWS_SDK_VERSION=2.20.18
AWS_MAVEN_URL=$MAVEN_URL/software/amazon/awssdk
AWS_PACKAGES=(
"bundle"
- "url-connection-client"
)
for pkg in "${AWS_PACKAGES[@]}"; do
wget $AWS_MAVEN_URL/$pkg/$AWS_SDK_VERSION/$pkg-$AWS_SDK_VERSION.jar
@@ -103,7 +101,6 @@ done
/path/to/bin/sql-client.sh embedded \
-j iceberg-flink-runtime-$ICEBERG_VERSION.jar \
-j bundle-$AWS_SDK_VERSION.jar \
- -j url-connection-client-$AWS_SDK_VERSION.jar \
shell
```
@@ -137,7 +134,6 @@ and then add them to the Hive classpath or add the jars at runtime in CLI:
```
add jar /my/path/to/iceberg-hive-runtime.jar;
add jar /my/path/to/aws/bundle.jar;
-add jar /my/path/to/aws/url-connection-client.jar;
```
With those dependencies, you can register a Glue catalog and create external tables in Hive at runtime in CLI by:
@@ -200,7 +196,7 @@ and table name validation are skipped, there is no guarantee that downstream sys
By default, Iceberg uses Glue's optimistic locking for concurrent updates to a table.
With optimistic locking, each table has a version id.
If users retrieve the table metadata, Iceberg records the version id of that table.
-Users can update the table, but only if the version id on the server side has not changed.
+Users can update the table as long as the version ID on the server side remains unchanged.
If there is a version mismatch, it means that someone else has modified the table before you did.
The update attempt fails, because you have a stale version of the table.
If this happens, Iceberg refreshes the metadata and checks if there might be potential conflict.
@@ -586,9 +582,9 @@ For more details of configuration, see sections [URL Connection HTTP Client Conf
Configure the following property to set the type of HTTP client:
-| Property | Default | Description |
-|------------------|---------------|------------------------------------------------------------------------------------------------------------|
-| http-client.type | urlconnection | Types of HTTP Client. <br/> `urlconnection`: URL Connection HTTP Client <br/> `apache`: Apache HTTP Client |
+| Property | Default | Description |
+|------------------|---------|------------------------------------------------------------------------------------------------------------|
+| http-client.type | apache | Types of HTTP Client. <br/> `urlconnection`: URL Connection HTTP Client <br/> `apache`: Apache HTTP Client |
#### URL Connection HTTP Client Configurations
@@ -656,7 +652,6 @@ LIB_PATH=/usr/share/aws/aws-java-sdk/
AWS_PACKAGES=(
"bundle"
- "url-connection-client"
)
ICEBERG_PACKAGES=(
diff --git a/docs/content/flink-actions.md b/docs/content/flink-actions.md
new file mode 100644
index 00000000..1fc5bb85
--- /dev/null
+++ b/docs/content/flink-actions.md
@@ -0,0 +1,42 @@
+---
+title: "Flink Actions"
+url: flink-actions
+aliases:
+ - "flink/flink-actions"
+menu:
+ main:
+ parent: Flink
+ weight: 500
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements. See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Rewrite files action.
+
+Iceberg provides API to rewrite small files into large files by submitting Flink batch jobs. The behavior of this Flink action is the same as Spark's [rewriteDataFiles](../maintenance/#compact-data-files).
+
+```java
+import org.apache.iceberg.flink.actions.Actions;
+
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+Table table = tableLoader.loadTable();
+RewriteDataFilesActionResult result = Actions.forTable(table)
+ .rewriteDataFiles()
+ .execute();
+```
+
+For more details of the rewrite files action, please refer to [RewriteDataFilesAction](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/flink/actions/RewriteDataFilesAction.html)
diff --git a/docs/content/flink-configuration.md b/docs/content/flink-configuration.md
new file mode 100644
index 00000000..7e531baa
--- /dev/null
+++ b/docs/content/flink-configuration.md
@@ -0,0 +1,161 @@
+---
+title: "Flink Configuration"
+url: flink-configuration
+aliases:
+ - "flink/flink-configuration"
+menu:
+ main:
+ parent: Flink
+ weight: 600
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements. See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+# Flink Configuration
+
+## Catalog Configuration
+
+A catalog is created and named by executing the following query (replace `<catalog_name>` with your catalog name and
+`<config_key>`=`<config_value>` with catalog implementation config):
+
+```sql
+CREATE CATALOG <catalog_name> WITH (
+ 'type'='iceberg',
+ `<config_key>`=`<config_value>`
+);
+```
+
+The following properties can be set globally and are not limited to a specific catalog implementation:
+
+| Property | Required | Values | Description |
+| ---------------------------- |----------| -------------------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| type | ✔️ | iceberg | Must be `iceberg`. |
+| catalog-type | | `hive`, `hadoop` or `rest` | `hive`, `hadoop` or `rest` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. |
+| catalog-impl | | | The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. |
+| property-version | | | Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is `1`. |
+| cache-enabled | | `true` or `false` | Whether to enable catalog cache, default value is `true`. |
+| cache.expiration-interval-ms | | | How long catalog entries are locally cached, in milliseconds; negative values like `-1` will disable expiration, value 0 is not allowed to set. default value is `-1`. |
+
+The following properties can be set if using the Hive catalog:
+
+| Property | Required | Values | Description |
+| --------------- |----------| ------ |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| uri | ✔️ | | The Hive metastore's thrift URI. |
+| clients | | | The Hive metastore client pool size, default value is 2. |
+| warehouse | | | The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath. |
+| hive-conf-dir | | | Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwritten with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog. |
+| hadoop-conf-dir | | | Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values. |
+
+The following properties can be set if using the Hadoop catalog:
+
+| Property | Required | Values | Description |
+| --------- |-------------| ------ | ---------------------------------------------------------- |
+| warehouse | ✔️ | | The HDFS directory to store metadata files and data files. |
+
+The following properties can be set if using the REST catalog:
+
+| Property | Required | Values | Description |
+| ---------- |----------| ------ |-----------------------------------------------------------------------------|
+| uri | ✔️ | | The URL to the REST Catalog. |
+| credential | | | A credential to exchange for a token in the OAuth2 client credentials flow. |
+| token | | | A token which will be used to interact with the server. |
+
+
+## Runtime configuration
+
+### Read options
+
+Flink read options are passed when configuring the Flink IcebergSource:
+
+```
+IcebergSource.forRowData()
+ .tableLoader(TableLoader.fromCatalog(...))
+ .assignerFactory(new SimpleSplitAssignerFactory())
+ .streaming(true)
+ .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+ .startSnapshotId(3821550127947089987L)
+ .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
+ .build()
+```
+
+For Flink SQL, read options can be passed in via SQL hints like this:
+
+```
+SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
+...
+```
+
+Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.
+
+```
+env.getConfig()
+ .getConfiguration()
+ .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
+...
+```
+
+`Read option` has the highest priority, followed by `Flink configuration` and then `Table property`.
+
+| Read option | Flink configuration | Table property | Default | Description |
+| --------------------------- | --------------------------------------------- | ---------------------------- | -------------------------------- | ------------------------------------------------------------ |
+| snapshot-id | N/A | N/A | null | For time travel in batch mode. Read data from the specified snapshot-id. |
+| case-sensitive | connector.iceberg.case-sensitive | N/A | false | If true, match column name in a case sensitive way. |
+| as-of-timestamp | N/A | N/A | null | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. |
+| starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots shou [...]
+| start-snapshot-timestamp | N/A | N/A | null | Start to read data from the most recent snapshot as of the given time in milliseconds. |
+| start-snapshot-id | N/A | N/A | null | Start to read data from the specified snapshot-id. |
+| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. |
+| split-size | connector.iceberg.split-size | read.split.target-size | 128 MB | Target size when combining input splits. |
+| split-lookback | connector.iceberg.split-file-open-cost | read.split.planning-lookback | 10 | Number of bins to consider when combining input splits. |
+| split-file-open-cost | connector.iceberg.split-file-open-cost | read.split.open-file-cost | 4MB | The estimated cost to open a file, used as a minimum weight when combining splits. |
+| streaming | connector.iceberg.streaming | N/A | false | Sets whether the current task runs in streaming or batch mode. |
+| monitor-interval | connector.iceberg.monitor-interval | N/A | 60s | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. |
+| include-column-stats | connector.iceberg.include-column-stats | N/A | false | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. |
+| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
+| limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. |
+
+
+### Write options
+
+Flink write options are passed when configuring the FlinkSink, like this:
+
+```
+FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .set("write-format", "orc")
+ .set(FlinkWriteOptions.OVERWRITE_MODE, "true");
+```
+
+For Flink SQL, write options can be passed in via SQL hints like this:
+
+```
+INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
+...
+```
+
+| Flink option | Default | Description |
+| ---------------------- | ------------------------------------------ | ------------------------------------------------------------ |
+| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
+| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
+| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled |
+| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
+| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode |
+| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
+| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
+| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
+| write-parallelism | Upstream operator parallelism | Overrides the writer parallelism |
\ No newline at end of file
diff --git a/docs/content/flink-ddl.md b/docs/content/flink-ddl.md
new file mode 100644
index 00000000..67f9e21d
--- /dev/null
+++ b/docs/content/flink-ddl.md
@@ -0,0 +1,213 @@
+---
+title: "Flink DDL"
+url: flink-ddl
+aliases:
+ - "flink/flink-ddl"
+menu:
+ main:
+ parent: Flink
+ weight: 200
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements. See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## DDL commands
+
+### `CREATE Catalog`
+
+#### Hive catalog
+
+This creates an Iceberg catalog named `hive_catalog` that can be configured using `'catalog-type'='hive'`, which loads tables from Hive metastore:
+
+```sql
+CREATE CATALOG hive_catalog WITH (
+ 'type'='iceberg',
+ 'catalog-type'='hive',
+ 'uri'='thrift://localhost:9083',
+ 'clients'='5',
+ 'property-version'='1',
+ 'warehouse'='hdfs://nn:8020/warehouse/path'
+);
+```
+
+The following properties can be set if using the Hive catalog:
+
+* `uri`: The Hive metastore's thrift URI. (Required)
+* `clients`: The Hive metastore client pool size, default value is 2. (Optional)
+* `warehouse`: The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath.
+* `hive-conf-dir`: Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwritten with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog.
+* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values.
+
+#### Hadoop catalog
+
+Iceberg also supports a directory-based catalog in HDFS that can be configured using `'catalog-type'='hadoop'`:
+
+```sql
+CREATE CATALOG hadoop_catalog WITH (
+ 'type'='iceberg',
+ 'catalog-type'='hadoop',
+ 'warehouse'='hdfs://nn:8020/warehouse/path',
+ 'property-version'='1'
+);
+```
+
+The following properties can be set if using the Hadoop catalog:
+
+* `warehouse`: The HDFS directory to store metadata files and data files. (Required)
+
+Execute the sql command `USE CATALOG hadoop_catalog` to set the current catalog.
+
+#### REST catalog
+
+This creates an iceberg catalog named `rest_catalog` that can be configured using `'catalog-type'='rest'`, which loads tables from a REST catalog:
+
+```sql
+CREATE CATALOG rest_catalog WITH (
+ 'type'='iceberg',
+ 'catalog-type'='rest',
+ 'uri'='https://localhost/'
+);
+```
+
+The following properties can be set if using the REST catalog:
+
+* `uri`: The URL to the REST Catalog (Required)
+* `credential`: A credential to exchange for a token in the OAuth2 client credentials flow (Optional)
+* `token`: A token which will be used to interact with the server (Optional)
+
+#### Custom catalog
+
+Flink also supports loading a custom Iceberg `Catalog` implementation by specifying the `catalog-impl` property:
+
+```sql
+CREATE CATALOG my_catalog WITH (
+ 'type'='iceberg',
+ 'catalog-impl'='com.my.custom.CatalogImpl',
+ 'my-additional-catalog-config'='my-value'
+);
+```
+
+#### Create through YAML config
+
+Catalogs can be registered in `sql-client-defaults.yaml` before starting the SQL client.
+
+```yaml
+catalogs:
+ - name: my_catalog
+ type: iceberg
+ catalog-type: hadoop
+ warehouse: hdfs://nn:8020/warehouse/path
+```
+
+#### Create through SQL Files
+
+The Flink SQL Client supports the `-i` startup option to execute an initialization SQL file to set up environment when starting up the SQL Client.
+
+```sql
+-- define available catalogs
+CREATE CATALOG hive_catalog WITH (
+ 'type'='iceberg',
+ 'catalog-type'='hive',
+ 'uri'='thrift://localhost:9083',
+ 'warehouse'='hdfs://nn:8020/warehouse/path'
+);
+
+USE CATALOG hive_catalog;
+```
+
+Using `-i <init.sql>` option to initialize SQL Client session:
+
+```bash
+/path/to/bin/sql-client.sh -i /path/to/init.sql
+```
+
+### `CREATE DATABASE`
+
+By default, Iceberg will use the `default` database in Flink. Using the following example to create a separate database in order to avoid creating tables under the `default` database:
+
+```sql
+CREATE DATABASE iceberg_db;
+USE iceberg_db;
+```
+
+### `CREATE TABLE`
+
+```sql
+CREATE TABLE `hive_catalog`.`default`.`sample` (
+ id BIGINT COMMENT 'unique id',
+ data STRING
+);
+```
+
+Table create commands support the commonly used [Flink create clauses](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/) including:
+
+* `PARTITION BY (column1, column2, ...)` to configure partitioning, Flink does not yet support hidden partitioning.
+* `COMMENT 'table document'` to set a table description.
+* `WITH ('key'='value', ...)` to set [table configuration](../configuration) which will be stored in Iceberg table properties.
+
+Currently, it does not support computed column, primary key and watermark definition etc.
+
+### `PARTITIONED BY`
+
+To create a partition table, use `PARTITIONED BY`:
+
+```sql
+CREATE TABLE `hive_catalog`.`default`.`sample` (
+ id BIGINT COMMENT 'unique id',
+ data STRING
+) PARTITIONED BY (data);
+```
+
+Iceberg support hidden partition but Flink don't support partitioning by a function on columns, so there is no way to support hidden partition in Flink DDL.
+
+### `CREATE TABLE LIKE`
+
+To create a table with the same schema, partitioning, and table properties as another table, use `CREATE TABLE LIKE`.
+
+```sql
+CREATE TABLE `hive_catalog`.`default`.`sample` (
+ id BIGINT COMMENT 'unique id',
+ data STRING
+);
+
+CREATE TABLE `hive_catalog`.`default`.`sample_like` LIKE `hive_catalog`.`default`.`sample`;
+```
+
+For more details, refer to the [Flink `CREATE TABLE` documentation](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/).
+
+
+### `ALTER TABLE`
+
+Iceberg only support altering table properties:
+
+```sql
+ALTER TABLE `hive_catalog`.`default`.`sample` SET ('write.format.default'='avro')
+```
+
+### `ALTER TABLE .. RENAME TO`
+
+```sql
+ALTER TABLE `hive_catalog`.`default`.`sample` RENAME TO `hive_catalog`.`default`.`new_sample`;
+```
+
+### `DROP TABLE`
+
+To delete a table, run:
+
+```sql
+DROP TABLE `hive_catalog`.`default`.`sample`;
+```
diff --git a/docs/content/flink-getting-started.md b/docs/content/flink-getting-started.md
index b60e2996..677d628c 100644
--- a/docs/content/flink-getting-started.md
+++ b/docs/content/flink-getting-started.md
@@ -1,6 +1,8 @@
---
-title: "Enabling Iceberg in Flink"
+title: "Flink Getting Started"
url: flink
+aliases:
+ - "flink/flink"
menu:
main:
parent: Flink
@@ -27,22 +29,22 @@ menu:
Apache Iceberg supports both [Apache Flink](https://flink.apache.org/)'s DataStream API and Table API. See the [Multi-Engine Support#apache-flink](https://iceberg.apache.org/multi-engine-support/#apache-flink) page for the integration of Apache Flink.
-| Feature support | Flink | Notes |
-| ----------------------------------------------------------- | ----- | ------------------------------------------------------------ |
-| [SQL create catalog](#creating-catalogs-and-using-catalogs) | ✔️ | |
-| [SQL create database](#create-database) | ✔️ | |
-| [SQL create table](#create-table) | ✔️ | |
-| [SQL create table like](#create-table-like) | ✔️ | |
-| [SQL alter table](#alter-table) | ✔️ | Only support altering table properties, column and partition changes are not supported |
-| [SQL drop_table](#drop-table) | ✔️ | |
-| [SQL select](#querying-with-sql) | ✔️ | Support both streaming and batch mode |
-| [SQL insert into](#insert-into) | ✔️ ️ | Support both streaming and batch mode |
-| [SQL insert overwrite](#insert-overwrite) | ✔️ ️ | |
-| [DataStream read](#reading-with-datastream) | ✔️ ️ | |
-| [DataStream append](#appending-data) | ✔️ ️ | |
-| [DataStream overwrite](#overwrite-data) | ✔️ ️ | |
-| [Metadata tables](#inspecting-tables) | ️ | Support Java API but does not support Flink SQL |
-| [Rewrite files action](#rewrite-files-action) | ✔️ ️ | |
+| Feature support | Flink | Notes |
+| ----------------------------------------------------------- |-------|----------------------------------------------------------------------------------------|
+| [SQL create catalog](#creating-catalogs-and-using-catalogs) | ✔️ | |
+| [SQL create database](#create-database) | ✔️ | |
+| [SQL create table](#create-table) | ✔️ | |
+| [SQL create table like](#create-table-like) | ✔️ | |
+| [SQL alter table](#alter-table) | ✔️ | Only support altering table properties, column and partition changes are not supported |
+| [SQL drop_table](#drop-table) | ✔️ | |
+| [SQL select](#querying-with-sql) | ✔️ | Support both streaming and batch mode |
+| [SQL insert into](#insert-into) | ✔️ ️ | Support both streaming and batch mode |
+| [SQL insert overwrite](#insert-overwrite) | ✔️ ️ | |
+| [DataStream read](#reading-with-datastream) | ✔️ ️ | |
+| [DataStream append](#appending-data) | ✔️ ️ | |
+| [DataStream overwrite](#overwrite-data) | ✔️ ️ | |
+| [Metadata tables](#inspecting-tables) | ✔️ | |
+| [Rewrite files action](#rewrite-files-action) | ✔️ ️ | |
## Preparation when using Flink SQL Client
@@ -108,7 +110,7 @@ wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_V
## Flink's Python API
{{< hint info >}}
-PyFlink 1.6.1 [does not work on OSX with a M1 cpu](https://issues.apache.org/jira/browse/FLINK-28786)
+PyFlink 1.6.1 [does not work on OSX with a M1 cpu](https://issues.apache.org/jira/browse/FLINK-28786)
{{< /hint >}}
Install the Apache Flink dependency using `pip`:
@@ -173,14 +175,14 @@ Run a query:
For more details, please refer to the [Python Table API](https://ci.apache.org/projects/flink/flink-docs-release-1.16/docs/dev/python/table/intro_to_table_api/).
-## Creating catalogs and using catalogs.
+## Adding catalogs.
Flink support to create catalogs by using Flink SQL.
### Catalog Configuration
A catalog is created and named by executing the following query (replace `<catalog_name>` with your catalog name and
-`<config_key>`=`<config_value>` with catalog implementation config):
+`<config_key>`=`<config_value>` with catalog implementation config):
```sql
CREATE CATALOG <catalog_name> WITH (
@@ -219,103 +221,9 @@ The following properties can be set if using the Hive catalog:
* `clients`: The Hive metastore client pool size, default value is 2. (Optional)
* `warehouse`: The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath.
* `hive-conf-dir`: Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwritten with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog.
-* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values.
+* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values.
-### Hadoop catalog
-
-Iceberg also supports a directory-based catalog in HDFS that can be configured using `'catalog-type'='hadoop'`:
-
-```sql
-CREATE CATALOG hadoop_catalog WITH (
- 'type'='iceberg',
- 'catalog-type'='hadoop',
- 'warehouse'='hdfs://nn:8020/warehouse/path',
- 'property-version'='1'
-);
-```
-
-The following properties can be set if using the Hadoop catalog:
-
-* `warehouse`: The HDFS directory to store metadata files and data files. (Required)
-
-Execute the sql command `USE CATALOG hadoop_catalog` to set the current catalog.
-
-### REST catalog
-
-This creates an iceberg catalog named `rest_catalog` that can be configured using `'catalog-type'='rest'`, which loads tables from a REST catalog:
-
-```sql
-CREATE CATALOG rest_catalog WITH (
- 'type'='iceberg',
- 'catalog-type'='rest',
- 'uri'='https://localhost/'
-);
-```
-
-The following properties can be set if using the REST catalog:
-
-* `uri`: The URL to the REST Catalog (Required)
-* `credential`: A credential to exchange for a token in the OAuth2 client credentials flow (Optional)
-* `token`: A token which will be used to interact with the server (Optional)
-
-### Custom catalog
-
-Flink also supports loading a custom Iceberg `Catalog` implementation by specifying the `catalog-impl` property:
-
-```sql
-CREATE CATALOG my_catalog WITH (
- 'type'='iceberg',
- 'catalog-impl'='com.my.custom.CatalogImpl',
- 'my-additional-catalog-config'='my-value'
-);
-```
-
-### Create through YAML config
-
-Catalogs can be registered in `sql-client-defaults.yaml` before starting the SQL client.
-
-```yaml
-catalogs:
- - name: my_catalog
- type: iceberg
- catalog-type: hadoop
- warehouse: hdfs://nn:8020/warehouse/path
-```
-
-### Create through SQL Files
-
-The Flink SQL Client supports the `-i` startup option to execute an initialization SQL file to set up environment when starting up the SQL Client.
-
-```sql
--- define available catalogs
-CREATE CATALOG hive_catalog WITH (
- 'type'='iceberg',
- 'catalog-type'='hive',
- 'uri'='thrift://localhost:9083',
- 'warehouse'='hdfs://nn:8020/warehouse/path'
-);
-
-USE CATALOG hive_catalog;
-```
-
-Using `-i <init.sql>` option to initialize SQL Client session:
-
-```bash
-/path/to/bin/sql-client.sh -i /path/to/init.sql
-```
-
-## DDL commands
-
-### `CREATE DATABASE`
-
-By default, Iceberg will use the `default` database in Flink. Using the following example to create a separate database in order to avoid creating tables under the `default` database:
-
-```sql
-CREATE DATABASE iceberg_db;
-USE iceberg_db;
-```
-
-### `CREATE TABLE`
+## Creating a table
```sql
CREATE TABLE `hive_catalog`.`default`.`sample` (
@@ -324,121 +232,7 @@ CREATE TABLE `hive_catalog`.`default`.`sample` (
);
```
-Table create commands support the commonly used [Flink create clauses](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/) including:
-
-* `PARTITION BY (column1, column2, ...)` to configure partitioning, Flink does not yet support hidden partitioning.
-* `COMMENT 'table document'` to set a table description.
-* `WITH ('key'='value', ...)` to set [table configuration](../configuration) which will be stored in Iceberg table properties.
-
-Currently, it does not support computed column, primary key and watermark definition etc.
-
-### `PARTITIONED BY`
-
-To create a partition table, use `PARTITIONED BY`:
-
-```sql
-CREATE TABLE `hive_catalog`.`default`.`sample` (
- id BIGINT COMMENT 'unique id',
- data STRING
-) PARTITIONED BY (data);
-```
-
-Iceberg support hidden partition but Flink don't support partitioning by a function on columns, so there is no way to support hidden partition in Flink DDL.
-
-### `CREATE TABLE LIKE`
-
-To create a table with the same schema, partitioning, and table properties as another table, use `CREATE TABLE LIKE`.
-
-```sql
-CREATE TABLE `hive_catalog`.`default`.`sample` (
- id BIGINT COMMENT 'unique id',
- data STRING
-);
-
-CREATE TABLE `hive_catalog`.`default`.`sample_like` LIKE `hive_catalog`.`default`.`sample`;
-```
-
-For more details, refer to the [Flink `CREATE TABLE` documentation](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/).
-
-
-### `ALTER TABLE`
-
-Iceberg only support altering table properties:
-
-```sql
-ALTER TABLE `hive_catalog`.`default`.`sample` SET ('write.format.default'='avro')
-```
-
-### `ALTER TABLE .. RENAME TO`
-
-```sql
-ALTER TABLE `hive_catalog`.`default`.`sample` RENAME TO `hive_catalog`.`default`.`new_sample`;
-```
-
-### `DROP TABLE`
-
-To delete a table, run:
-
-```sql
-DROP TABLE `hive_catalog`.`default`.`sample`;
-```
-
-## Querying with SQL
-
-Iceberg support both streaming and batch read in Flink. Execute the following sql command to switch execution mode from `streaming` to `batch`, and vice versa:
-
-```sql
--- Execute the flink job in streaming mode for current session context
-SET execution.runtime-mode = streaming;
-
--- Execute the flink job in batch mode for current session context
-SET execution.runtime-mode = batch;
-```
-
-### Flink batch read
-
-Submit a Flink __batch__ job using the following sentences:
-
-```sql
--- Execute the flink job in batch mode for current session context
-SET execution.runtime-mode = batch;
-SELECT * FROM sample;
-```
-
-### Flink streaming read
-
-Iceberg supports processing incremental data in flink streaming jobs which starts from a historical snapshot-id:
-
-```sql
--- Submit the flink job in streaming mode for current session.
-SET execution.runtime-mode = streaming;
-
--- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
-SET table.dynamic-table-options.enabled=true;
-
--- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
-SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
-
--- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
-SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
-```
-
-There are some options that could be set in Flink SQL hint options for streaming job, see [read options](#Read-options) for details.
-
-### FLIP-27 source for SQL
-
-Here are the SQL settings for the [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) source. All other SQL settings and options documented above are applicable to the FLIP-27 source.
-
-```sql
--- Opt in the FLIP-27 source. Default is false.
-SET table.exec.iceberg.use-flip27-source = true;
-```
-
-## Writing with SQL
-
-Iceberg support both `INSERT INTO` and `INSERT OVERWRITE`.
-
-### `INSERT INTO`
+## Writing
To append new data to a table with a Flink streaming job, use `INSERT INTO`:
@@ -447,14 +241,12 @@ INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table;
```
-### `INSERT OVERWRITE`
-
To replace data in the table with the result of a query, use `INSERT OVERWRITE` in batch job (flink streaming job does not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables.
Partitions that have rows produced by the SELECT query will be replaced, for example:
```sql
-INSERT OVERWRITE sample VALUES (1, 'a');
+INSERT OVERWRITE `hive_catalog`.`default`.`sample` VALUES (1, 'a');
```
Iceberg also support overwriting given partitions by the `select` values:
@@ -463,191 +255,6 @@ Iceberg also support overwriting given partitions by the `select` values:
INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
```
-For a partitioned iceberg table, when all the partition columns are set a value in `PARTITION` clause, it is inserting into a static partition, otherwise if partial partition columns (prefix part of all partition columns) are set a value in `PARTITION` clause, it is writing the query result into a dynamic partition.
-For an unpartitioned iceberg table, its data will be completely overwritten by `INSERT OVERWRITE`.
-
-### `UPSERT`
-
-Iceberg supports `UPSERT` based on the primary key when writing data into v2 table format. There are two ways to enable upsert.
-
-1. Enable the `UPSERT` mode as table-level property `write.upsert.enabled`. Here is an example SQL statement to set the table property when creating a table. It would be applied for all write paths to this table (batch or streaming) unless overwritten by write options as described later.
-
-```sql
-CREATE TABLE `hive_catalog`.`default`.`sample` (
- `id` INT UNIQUE COMMENT 'unique id',
- `data` STRING NOT NULL,
- PRIMARY KEY(`id`) NOT ENFORCED
-) with ('format-version'='2', 'write.upsert.enabled'='true');
-```
-
-2. Enabling `UPSERT` mode using `upsert-enabled` in the [write options](#Write-options) provides more flexibility than a table level config. Note that you still need to use v2 table format and specify the primary key when creating the table.
-
-```sql
-INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
-...
-```
-
-{{< hint info >}}
-OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
-{{< /hint >}}
-
-## Reading with DataStream
-
-Iceberg support streaming or batch read in Java API now.
-
-### Batch Read
-
-This example will read all records from iceberg table and then print to the stdout console in flink batch job:
-
-```java
-StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
-DataStream<RowData> batch = FlinkSource.forRowData()
- .env(env)
- .tableLoader(tableLoader)
- .streaming(false)
- .build();
-
-// Print all records to stdout.
-batch.print();
-
-// Submit and execute this batch read job.
-env.execute("Test Iceberg Batch Read");
-```
-
-### Streaming read
-
-This example will read incremental records which start from snapshot-id '3821550127947089987' and print to stdout console in flink streaming job:
-
-```java
-StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
-DataStream<RowData> stream = FlinkSource.forRowData()
- .env(env)
- .tableLoader(tableLoader)
- .streaming(true)
- .startSnapshotId(3821550127947089987L)
- .build();
-
-// Print all records to stdout.
-stream.print();
-
-// Submit and execute this streaming read job.
-env.execute("Test Iceberg Streaming Read");
-```
-
-There are other options that can be set, please see the [FlinkSource#Builder](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/flink/source/FlinkSource.html).
-
-## Reading with DataStream (FLIP-27 source)
-
-[FLIP-27 source interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
-was introduced in Flink 1.12. It aims to solve several shortcomings of the old `SourceFunction`
-streaming source interface. It also unifies the source interfaces for both batch and streaming executions.
-Most source connectors (like Kafka, file) in Flink repo have migrated to the FLIP-27 interface.
-Flink is planning to deprecate the old `SourceFunction` interface in the near future.
-
-A FLIP-27 based Flink `IcebergSource` is added in `iceberg-flink` module. The FLIP-27 `IcebergSource` is currently an experimental feature.
-
-### Batch Read
-
-This example will read all records from iceberg table and then print to the stdout console in flink batch job:
-
-```java
-StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
-
-IcebergSource<RowData> source = IcebergSource.forRowData()
- .tableLoader(tableLoader)
- .assignerFactory(new SimpleSplitAssignerFactory())
- .build();
-
-DataStream<RowData> batch = env.fromSource(
- source,
- WatermarkStrategy.noWatermarks(),
- "My Iceberg Source",
- TypeInformation.of(RowData.class));
-
-// Print all records to stdout.
-batch.print();
-
-// Submit and execute this batch read job.
-env.execute("Test Iceberg Batch Read");
-```
-
-### Streaming read
-
-This example will start the streaming read from the latest table snapshot (inclusive).
-Every 60s, it polls Iceberg table to discover new append-only snapshots.
-CDC read is not supported yet.
-
-```java
-StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
-
-IcebergSource source = IcebergSource.forRowData()
- .tableLoader(tableLoader)
- .assignerFactory(new SimpleSplitAssignerFactory())
- .streaming(true)
- .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
- .monitorInterval(Duration.ofSeconds(60))
- .build()
-
-DataStream<RowData> stream = env.fromSource(
- source,
- WatermarkStrategy.noWatermarks(),
- "My Iceberg Source",
- TypeInformation.of(RowData.class));
-
-// Print all records to stdout.
-stream.print();
-
-// Submit and execute this streaming read job.
-env.execute("Test Iceberg Streaming Read");
-```
-
-There are other options that could be set by Java API, please see the
-[IcebergSource#Builder](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/flink/source/IcebergSource.html).
-
-### Read as Avro GenericRecord
-
-FLIP-27 Iceberg source provides `AvroGenericRecordReaderFunction` that converts
-Flink `RowData` Avro `GenericRecord`. You can use the convert to read from
-Iceberg table as Avro GenericRecord DataStream.
-
-Please make sure `flink-avro` jar is included in the classpath.
-Also `iceberg-flink-runtime` shaded bundle jar can't be used
-because the runtime jar shades the avro package.
-Please use non-shaded `iceberg-flink` jar instead.
-
-```java
-TableLoader tableLoader = ...;
-Table table;
-try (TableLoader loader = tableLoader) {
- loader.open();
- table = loader.loadTable();
-}
-
-AvroGenericRecordReaderFunction readerFunction = AvroGenericRecordReaderFunction.fromTable(table);
-
-IcebergSource<GenericRecord> source =
- IcebergSource.<GenericRecord>builder()
- .tableLoader(tableLoader)
- .readerFunction(readerFunction)
- .assignerFactory(new SimpleSplitAssignerFactory())
- ...
- .build();
-
-DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
- "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema));
-```
-
-## Writing with DataStream
-
-Iceberg support writing to iceberg table from different DataStream input.
-
-
-### Appending data.
-
Flink supports writing `DataStream<RowData>` and `DataStream<Row>` to the sink iceberg table natively.
```java
@@ -664,407 +271,50 @@ FlinkSink.forRowData(input)
env.execute("Test Iceberg DataStream");
```
-The iceberg API also allows users to write generic `DataStream<T>` to iceberg table, more example could be found in this [unit test](https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java).
-
-### Overwrite data
-
-Set the `overwrite` flag in FlinkSink builder to overwrite the data in existing iceberg tables:
-
-```java
-StreamExecutionEnvironment env = ...;
-
-DataStream<RowData> input = ... ;
-Configuration hadoopConf = new Configuration();
-TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
-
-FlinkSink.forRowData(input)
- .tableLoader(tableLoader)
- .overwrite(true)
- .append();
-
-env.execute("Test Iceberg DataStream");
-```
-
-### Upsert data
-
-Set the `upsert` flag in FlinkSink builder to upsert the data in existing iceberg table. The table must use v2 table format and have a primary key.
-
-```java
-StreamExecutionEnvironment env = ...;
-
-DataStream<RowData> input = ... ;
-Configuration hadoopConf = new Configuration();
-TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
-
-FlinkSink.forRowData(input)
- .tableLoader(tableLoader)
- .upsert(true)
- .append();
-
-env.execute("Test Iceberg DataStream");
-```
-
-{{< hint info >}}
-OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
-{{< /hint >}}
-
-### Write with Avro GenericRecord
-
-Flink Iceberg sink provides `AvroGenericRecordToRowDataMapper` that converts
-Avro `GenericRecord` to Flink `RowData`. You can use the mapper to write
-Avro GenericRecord DataStream to Iceberg.
-
-Please make sure `flink-avro` jar is included in the classpath.
-Also `iceberg-flink-runtime` shaded bundle jar can't be used
-because the runtime jar shades the avro package.
-Please use non-shaded `iceberg-flink` jar instead.
-
-```java
-DataStream<org.apache.avro.generic.GenericRecord> dataStream = ...;
-
-Schema icebergSchema = table.schema();
-
-
-// The Avro schema converted from Iceberg schema can't be used
-// due to precision difference between how Iceberg schema (micro)
-// and Flink AvroToRowDataConverters (milli) deal with time type.
-// Instead, use the Avro schema defined directly.
-// See AvroGenericRecordToRowDataMapper Javadoc for more details.
-org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, table.name());
-
-GenericRecordAvroTypeInfo avroTypeInfo = new GenericRecordAvroTypeInfo(avroSchema);
-RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
-
-FlinkSink.builderFor(
- dataStream,
- AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema),
- FlinkCompatibilityUtil.toTypeInfo(rowType))
- .table(table)
- .tableLoader(tableLoader)
- .append();
-```
-
-### Netrics
-
-The following Flink metrics are provided by the Flink Iceberg sink.
-
-Parallel writer metrics are added under the sub group of `IcebergStreamWriter`.
-They should have the following key-value tags.
-* table: full table name (like iceberg.my_db.my_table)
-* subtask_index: writer subtask index starting from 0
-
- Metric name | Metric type | Description |
-| ------------------------- |------------|-----------------------------------------------------------------------------------------------------|
-| lastFlushDurationMs | Gague | The duration (in milli) that writer subtasks take to flush and upload the files during checkpoint. |
-| flushedDataFiles | Counter | Number of data files flushed and uploaded. |
-| flushedDeleteFiles | Counter | Number of delete files flushed and uploaded. |
-| flushedReferencedDataFiles| Counter | Number of data files referenced by the flushed delete files. |
-| dataFilesSizeHistogram | Histogram | Histogram distribution of data file sizes (in bytes). |
-| deleteFilesSizeHistogram | Histogram | Histogram distribution of delete file sizes (in bytes). |
-
-Committer metrics are added under the sub group of `IcebergFilesCommitter`.
-They should have the following key-value tags.
-* table: full table name (like iceberg.my_db.my_table)
-
- Metric name | Metric type | Description |
-|---------------------------------|--------|----------------------------------------------------------------------------|
-| lastCheckpointDurationMs | Gague | The duration (in milli) that the committer operator checkpoints its state. |
-| lastCommitDurationMs | Gague | The duration (in milli) that the Iceberg table commit takes. |
-| committedDataFilesCount | Counter | Number of data files committed. |
-| committedDataFilesRecordCount | Counter | Number of records contained in the committed data files. |
-| committedDataFilesByteCount | Counter | Number of bytes contained in the committed data files. |
-| committedDeleteFilesCount | Counter | Number of delete files committed. |
-| committedDeleteFilesRecordCount | Counter | Number of records contained in the committed delete files. |
-| committedDeleteFilesByteCount | Counter | Number of bytes contained in the committed delete files. |
-| elapsedSecondsSinceLastSuccessfulCommit| Gague | Elapsed time (in seconds) since last successful Iceberg commit. |
-
-`elapsedSecondsSinceLastSuccessfulCommit` is an ideal alerting metric
-to detect failed or missing Iceberg commits.
-* Iceberg commit happened after successful Flink checkpoint in the `notifyCheckpointComplete` callback.
-It could happen that Iceberg commits failed (for whatever reason), while Flink checkpoints succeeding.
-* It could also happen that `notifyCheckpointComplete` wasn't triggered (for whatever bug).
-As a result, there won't be any Iceberg commits attempted.
-
-If the checkpoint interval (and expected Iceberg commit interval) is 5 minutes, set up alert with rule like `elapsedSecondsSinceLastSuccessfulCommit > 60 minutes` to detect failed or missing Iceberg commits in the past hour.
-
-## Options
-### Read options
-
-Flink read options are passed when configuring the Flink IcebergSource:
-
-```
-IcebergSource.forRowData()
- .tableLoader(TableLoader.fromCatalog(...))
- .assignerFactory(new SimpleSplitAssignerFactory())
- .streaming(true)
- .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
- .startSnapshotId(3821550127947089987L)
- .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
- .build()
-```
-For Flink SQL, read options can be passed in via SQL hints like this:
-```
-SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
-...
-```
-
-Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.
-
-```
-env.getConfig()
- .getConfiguration()
- .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
-...
-```
-
-`Read option` has the highest priority, followed by `Flink configuration` and then `Table property`.
-
-| Read option | Flink configuration | Table property | Default | Description |
-| --------------------------- | --------------------------------------------- | ---------------------------- | -------------------------------- | ------------------------------------------------------------ |
-| snapshot-id | N/A | N/A | null | For time travel in batch mode. Read data from the specified snapshot-id. |
-| case-sensitive | connector.iceberg.case-sensitive | N/A | false | If true, match column name in a case sensitive way. |
-| as-of-timestamp | N/A | N/A | null | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. |
-| starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots shou [...]
-| start-snapshot-timestamp | N/A | N/A | null | Start to read data from the most recent snapshot as of the given time in milliseconds. |
-| start-snapshot-id | N/A | N/A | null | Start to read data from the specified snapshot-id. |
-| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. |
-| split-size | connector.iceberg.split-size | read.split.target-size | 128 MB | Target size when combining input splits. |
-| split-lookback | connector.iceberg.split-file-open-cost | read.split.planning-lookback | 10 | Number of bins to consider when combining input splits. |
-| split-file-open-cost | connector.iceberg.split-file-open-cost | read.split.open-file-cost | 4MB | The estimated cost to open a file, used as a minimum weight when combining splits. |
-| streaming | connector.iceberg.streaming | N/A | false | Sets whether the current task runs in streaming or batch mode. |
-| monitor-interval | connector.iceberg.monitor-interval | N/A | 60s | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. |
-| include-column-stats | connector.iceberg.include-column-stats | N/A | false | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. |
-| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
-| limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. |
-
-
-### Write options
-
-Flink write options are passed when configuring the FlinkSink, like this:
-
-```
-FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
- .table(table)
- .tableLoader(tableLoader)
- .set("write-format", "orc")
- .set(FlinkWriteOptions.OVERWRITE_MODE, "true");
-```
-For Flink SQL, write options can be passed in via SQL hints like this:
-```
-INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
-...
-```
-
-| Flink option | Default | Description |
-|------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------|
-| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
-| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
-| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled |
-| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
-| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode |
-| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
-| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
-| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
-
-
-## Inspecting tables
-
-To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables.
-
-Metadata tables are identified by adding the metadata table name after the original table name. For example, history for `db.table` is read using `db.table$history`.
-
-### History
-
-To show table history:
-
-```sql
-SELECT * FROM prod.db.table$history;
-```
-
-| made_current_at | snapshot_id | parent_id | is_current_ancestor |
-| ----------------------- | ------------------- | ------------------- | ------------------- |
-| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL | true |
-| 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true |
-| 2019-02-09 16:24:30.13 | 296410040247533544 | 5179299526185056830 | false |
-| 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true |
-| 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true |
-| 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true |
-
-{{< hint info >}}
-**This shows a commit that was rolled back.** In this example, snapshot 296410040247533544 and 2999875608062437330 have the same parent snapshot 5179299526185056830. Snapshot 296410040247533544 was rolled back and is *not* an ancestor of the current table state.
-{{< /hint >}}
-
-### Metadata Log Entries
-
-To show table metadata log entries:
-
-```sql
-SELECT * from prod.db.table$metadata_log_entries;
-```
-
-| timestamp | file | latest_snapshot_id | latest_schema_id | latest_sequence_number |
-| ----------------------- | ------------------------------------------------------------ | ------------------ | ---------------- | ---------------------- |
-| 2022-07-28 10:43:52.93 | s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.json | null | null | null |
-| 2022-07-28 10:43:57.487 | s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json | 170260833677645300 | 0 | 1 |
-| 2022-07-28 10:43:58.25 | s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json | 958906493976709774 | 0 | 2 |
-
-### Snapshots
-
-To show the valid snapshots for a table:
-
-```sql
-SELECT * FROM prod.db.table$snapshots;
-```
-
-| committed_at | snapshot_id | parent_id | operation | manifest_list | summary |
-| ----------------------- | -------------- | --------- | --------- | -------------------------------------------------- | ------------------------------------------------------------ |
-| 2019-02-08 03:29:51.215 | 57897183625154 | null | append | s3://.../table/metadata/snap-57897183625154-1.avro | { added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, flink.job-id -> 2e274eecb503d85369fb390e8956c813 } |
-
-You can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:
-
-```sql
-select
- h.made_current_at,
- s.operation,
- h.snapshot_id,
- h.is_current_ancestor,
- s.summary['flink.job-id']
-from prod.db.table$history h
-join prod.db.table$snapshots s
- on h.snapshot_id = s.snapshot_id
-order by made_current_at
-```
-
-| made_current_at | operation | snapshot_id | is_current_ancestor | summary[flink.job-id] |
-| ----------------------- | --------- | -------------- | ------------------- | -------------------------------- |
-| 2019-02-08 03:29:51.215 | append | 57897183625154 | true | 2e274eecb503d85369fb390e8956c813 |
-
-### Files
+## Reading
-To show a table's current data files:
+Submit a Flink __batch__ job using the following sentences:
```sql
-SELECT * FROM prod.db.table$files;
+-- Execute the flink job in batch mode for current session context
+SET execution.runtime-mode = batch;
+SELECT * FROM `hive_catalog`.`default`.`sample`;
```
-| content | file_path | file_format | spec_id | partition | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | sort_order_id |
-| ------- | ------------------------------------------------------------ | ----------- | ------- | ---------------- | ------------ | ------------------ | ------------------ | ---------------- | ----------------- | ---------------- | --------------- | --------------- | ------------ | ------------- | ------------ | ------------- |
-| 0 | s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 01} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> c] | [1 -> , 2 -> c] | null | [4] | null | null |
-| 0 | s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 02} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> b] | [1 -> , 2 -> b] | null | [4] | null | null |
-| 0 | s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 03} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> a] | [1 -> , 2 -> a] | null | [4] | null | null |
-
-### Manifests
-
-To show a table's current file manifests:
+Iceberg supports processing incremental data in flink __streaming__ jobs which starts from a historical snapshot-id:
```sql
-SELECT * FROM prod.db.table$manifests;
-```
-
-| path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partition_summaries |
-| ------------------------------------------------------------ | ------ | ----------------- | ------------------- | ---------------------- | ------------------------- | ------------------------ | ------------------------------------ |
-| s3://.../table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro | 4479 | 0 | 6668963634911763636 | 8 | 0 | 0 | [[false,null,2019-05-13,2019-05-15]] |
-
-Note:
-
-1. Fields within `partition_summaries` column of the manifests table correspond to `field_summary` structs within [manifest list](../../../spec#manifest-lists), with the following order:
- - `contains_null`
- - `contains_nan`
- - `lower_bound`
- - `upper_bound`
-2. `contains_nan` could return null, which indicates that this information is not available from the file's metadata.
- This usually occurs when reading from V1 table, where `contains_nan` is not populated.
+-- Submit the flink job in streaming mode for current session.
+SET execution.runtime-mode = streaming;
-### Partitions
+-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
+SET table.dynamic-table-options.enabled=true;
-To show a table's current partitions:
+-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
+SELECT * FROM `hive_catalog`.`default`.`sample` /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
-```sql
-SELECT * FROM prod.db.table$partitions;
+-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
+SELECT * FROM `hive_catalog`.`default`.`sample` /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
```
-| partition | record_count | file_count | spec_id |
-| -------------- | ------------ | ---------- | ------- |
-| {20211001, 11} | 1 | 1 | 0 |
-| {20211002, 11} | 1 | 1 | 0 |
-| {20211001, 10} | 1 | 1 | 0 |
-| {20211002, 10} | 1 | 1 | 0 |
-
-Note:
-For unpartitioned tables, the partitions table will contain only the record_count and file_count columns.
-
-### All Metadata Tables
-
-These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots.
-
-{{< hint danger >}}
-The "all" metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot.
-{{< /hint >}}
-
-#### All Data Files
-
-To show all of the table's data files and each file's metadata:
+SQL is also the recommended way to inspect tables. To view all of the snapshots in a table, use the snapshots metadata table:
```sql
-SELECT * FROM prod.db.table$all_data_files;
+SELECT * FROM `hive_catalog`.`default`.`sample`.`snapshots`
```
-| content | file_path | file_format | partition | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | sort_order_id |
-| ------- | ------------------------------------------------------------ | ----------- | ---------- | ------------ | ------------------ | ------------------ | ------------------ | ----------------- | ---------------- | ----------------------- | ----------------------- | ------------ | ------------- | ------------ | ------------- |
-| 0 | s3://.../dt=20210102/00000-0-756e2512-49ae-45bb-aae3-c0ca475e7879-00001.parquet | PARQUET | {20210102} | 14 | 2444 | {1 -> 94, 2 -> 17} | {1 -> 14, 2 -> 14} | {1 -> 0, 2 -> 0} | {} | {1 -> 1, 2 -> 20210102} | {1 -> 2, 2 -> 20210102} | null | [4] | null | 0 |
-| 0 | s3://.../dt=20210103/00000-0-26222098-032f-472b-8ea5-651a55b21210-00001.parquet | PARQUET | {20210103} | 14 | 2444 | {1 -> 94, 2 -> 17} | {1 -> 14, 2 -> 14} | {1 -> 0, 2 -> 0} | {} | {1 -> 1, 2 -> 20210103} | {1 -> 3, 2 -> 20210103} | null | [4] | null | 0 |
-| 0 | s3://.../dt=20210104/00000-0-a3bb1927-88eb-4f1c-bc6e-19076b0d952e-00001.parquet | PARQUET | {20210104} | 14 | 2444 | {1 -> 94, 2 -> 17} | {1 -> 14, 2 -> 14} | {1 -> 0, 2 -> 0} | {} | {1 -> 1, 2 -> 20210104} | {1 -> 3, 2 -> 20210104} | null | [4] | null | 0 |
-
-#### All Manifests
-
-To show all of the table's manifest files:
+Iceberg support streaming or batch read in Java API:
-```sql
-SELECT * FROM prod.db.table$all_manifests;
```
-
-| path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partition_summaries |
-| ------------------------------------------------------------ | ------ | ----------------- | ------------------- | ---------------------- | ------------------------- | ------------------------ | ------------------------------------ |
-| s3://.../metadata/a85f78c5-3222-4b37-b7e4-faf944425d48-m0.avro | 6376 | 0 | 6272782676904868561 | 2 | 0 | 0 | [{false, false, 20210101, 20210101}] |
-
-Note:
-
-1. Fields within `partition_summaries` column of the manifests table correspond to `field_summary` structs within [manifest list](../../../spec#manifest-lists), with the following order:
- - `contains_null`
- - `contains_nan`
- - `lower_bound`
- - `upper_bound`
-2. `contains_nan` could return null, which indicates that this information is not available from the file's metadata.
- This usually occurs when reading from V1 table, where `contains_nan` is not populated.
-
-### References
-
-To show a table's known snapshot references:
-
-```sql
-SELECT * FROM prod.db.table$refs;
+DataStream<RowData> batch = FlinkSource.forRowData()
+ .env(env)
+ .tableLoader(tableLoader)
+ .streaming(false)
+ .build();
```
-| name | type | snapshot_id | max_reference_age_in_ms | min_snapshots_to_keep | max_snapshot_age_in_ms |
-| ------- | ------ | ------------------- | ----------------------- | --------------------- | ---------------------- |
-| main | BRANCH | 4686954189838128572 | 10 | 20 | 30 |
-| testTag | TAG | 4686954189838128572 | 10 | null | null |
-## Rewrite files action.
-
-Iceberg provides API to rewrite small files into large files by submitting flink batch job. The behavior of this flink action is the same as the spark's [rewriteDataFiles](../maintenance/#compact-data-files).
-
-```java
-import org.apache.iceberg.flink.actions.Actions;
-
-TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
-Table table = tableLoader.loadTable();
-RewriteDataFilesActionResult result = Actions.forTable(table)
- .rewriteDataFiles()
- .execute();
-```
-For more doc about options of the rewrite files action, please see [RewriteDataFilesAction](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/flink/actions/RewriteDataFilesAction.html)
## Type conversion
@@ -1075,7 +325,7 @@ Iceberg's integration for Flink automatically converts between Flink and Iceberg
Flink types are converted to Iceberg types according to the following table:
| Flink | Iceberg | Notes |
-|----------------- |----------------------------|---------------|
+| ------------------- | -------------------------- | ------------- |
| boolean | boolean | |
| tinyint | integer | |
| smallint | integer | |
@@ -1111,7 +361,7 @@ Flink types are converted to Iceberg types according to the following table:
Iceberg types are converted to Flink types according to the following table:
| Iceberg | Flink |
-|----------------------------|-----------------------|
+| -------------------------- | --------------------- |
| boolean | boolean |
| struct | row |
| list | array |
@@ -1138,3 +388,4 @@ There are some features that are do not yet supported in the current Flink Icebe
* Don't support creating iceberg table with computed column.
* Don't support creating iceberg table with watermark.
* Don't support adding columns, removing columns, renaming columns, changing columns. [FLINK-19062](https://issues.apache.org/jira/browse/FLINK-19062) is tracking this.
+*
\ No newline at end of file
diff --git a/docs/content/flink-queries.md b/docs/content/flink-queries.md
new file mode 100644
index 00000000..9afabe28
--- /dev/null
+++ b/docs/content/flink-queries.md
@@ -0,0 +1,450 @@
+---
+title: "Flink Queries"
+url: flink-queries
+aliases:
+ - "flink/flink-queries"
+menu:
+ main:
+ parent: Flink
+ weight: 300
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements. See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+# Flink Queries
+
+Iceberg support streaming and batch read With [Apache Flink](https://flink.apache.org/)'s DataStream API and Table API.
+
+## Reading with SQL
+
+Iceberg support both streaming and batch read in Flink. Execute the following sql command to switch execution mode from `streaming` to `batch`, and vice versa:
+
+```sql
+-- Execute the flink job in streaming mode for current session context
+SET execution.runtime-mode = streaming;
+
+-- Execute the flink job in batch mode for current session context
+SET execution.runtime-mode = batch;
+```
+
+### Flink batch read
+
+Submit a Flink __batch__ job using the following sentences:
+
+```sql
+-- Execute the flink job in batch mode for current session context
+SET execution.runtime-mode = batch;
+SELECT * FROM sample;
+```
+
+### Flink streaming read
+
+Iceberg supports processing incremental data in Flink streaming jobs which starts from a historical snapshot-id:
+
+```sql
+-- Submit the flink job in streaming mode for current session.
+SET execution.runtime-mode = streaming;
+
+-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
+SET table.dynamic-table-options.enabled=true;
+
+-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
+SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
+
+-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
+SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
+```
+
+There are some options that could be set in Flink SQL hint options for streaming job, see [read options](#Read-options) for details.
+
+### FLIP-27 source for SQL
+
+Here are the SQL settings for the [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) source. All other SQL settings and options documented above are applicable to the FLIP-27 source.
+
+```sql
+-- Opt in the FLIP-27 source. Default is false.
+SET table.exec.iceberg.use-flip27-source = true;
+```
+
+## Reading with DataStream
+
+Iceberg support streaming or batch read in Java API now.
+
+### Batch Read
+
+This example will read all records from iceberg table and then print to the stdout console in flink batch job:
+
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+DataStream<RowData> batch = FlinkSource.forRowData()
+ .env(env)
+ .tableLoader(tableLoader)
+ .streaming(false)
+ .build();
+
+// Print all records to stdout.
+batch.print();
+
+// Submit and execute this batch read job.
+env.execute("Test Iceberg Batch Read");
+```
+
+### Streaming read
+
+This example will read incremental records which start from snapshot-id '3821550127947089987' and print to stdout console in flink streaming job:
+
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+DataStream<RowData> stream = FlinkSource.forRowData()
+ .env(env)
+ .tableLoader(tableLoader)
+ .streaming(true)
+ .startSnapshotId(3821550127947089987L)
+ .build();
+
+// Print all records to stdout.
+stream.print();
+
+// Submit and execute this streaming read job.
+env.execute("Test Iceberg Streaming Read");
+```
+
+There are other options that can be set, please see the [FlinkSource#Builder](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/flink/source/FlinkSource.html).
+
+## Reading with DataStream (FLIP-27 source)
+
+[FLIP-27 source interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+was introduced in Flink 1.12. It aims to solve several shortcomings of the old `SourceFunction`
+streaming source interface. It also unifies the source interfaces for both batch and streaming executions.
+Most source connectors (like Kafka, file) in Flink repo have migrated to the FLIP-27 interface.
+Flink is planning to deprecate the old `SourceFunction` interface in the near future.
+
+A FLIP-27 based Flink `IcebergSource` is added in `iceberg-flink` module. The FLIP-27 `IcebergSource` is currently an experimental feature.
+
+### Batch Read
+
+This example will read all records from iceberg table and then print to the stdout console in flink batch job:
+
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+
+IcebergSource<RowData> source = IcebergSource.forRowData()
+ .tableLoader(tableLoader)
+ .assignerFactory(new SimpleSplitAssignerFactory())
+ .build();
+
+DataStream<RowData> batch = env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "My Iceberg Source",
+ TypeInformation.of(RowData.class));
+
+// Print all records to stdout.
+batch.print();
+
+// Submit and execute this batch read job.
+env.execute("Test Iceberg Batch Read");
+```
+
+### Streaming read
+
+This example will start the streaming read from the latest table snapshot (inclusive).
+Every 60s, it polls Iceberg table to discover new append-only snapshots.
+CDC read is not supported yet.
+
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+
+IcebergSource source = IcebergSource.forRowData()
+ .tableLoader(tableLoader)
+ .assignerFactory(new SimpleSplitAssignerFactory())
+ .streaming(true)
+ .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+ .monitorInterval(Duration.ofSeconds(60))
+ .build()
+
+DataStream<RowData> stream = env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "My Iceberg Source",
+ TypeInformation.of(RowData.class));
+
+// Print all records to stdout.
+stream.print();
+
+// Submit and execute this streaming read job.
+env.execute("Test Iceberg Streaming Read");
+```
+
+There are other options that could be set by Java API, please see the
+[IcebergSource#Builder](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/flink/source/IcebergSource.html).
+
+### Read as Avro GenericRecord
+
+FLIP-27 Iceberg source provides `AvroGenericRecordReaderFunction` that converts
+Flink `RowData` Avro `GenericRecord`. You can use the convert to read from
+Iceberg table as Avro GenericRecord DataStream.
+
+Please make sure `flink-avro` jar is included in the classpath.
+Also `iceberg-flink-runtime` shaded bundle jar can't be used
+because the runtime jar shades the avro package.
+Please use non-shaded `iceberg-flink` jar instead.
+
+```java
+TableLoader tableLoader = ...;
+Table table;
+try (TableLoader loader = tableLoader) {
+ loader.open();
+ table = loader.loadTable();
+}
+
+AvroGenericRecordReaderFunction readerFunction = AvroGenericRecordReaderFunction.fromTable(table);
+
+IcebergSource<GenericRecord> source =
+ IcebergSource.<GenericRecord>builder()
+ .tableLoader(tableLoader)
+ .readerFunction(readerFunction)
+ .assignerFactory(new SimpleSplitAssignerFactory())
+ ...
+ .build();
+
+DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
+ "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema));
+```
+
+## Options
+
+### Read options
+
+Flink read options are passed when configuring the Flink IcebergSource:
+
+```
+IcebergSource.forRowData()
+ .tableLoader(TableLoader.fromCatalog(...))
+ .assignerFactory(new SimpleSplitAssignerFactory())
+ .streaming(true)
+ .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+ .startSnapshotId(3821550127947089987L)
+ .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
+ .build()
+```
+
+For Flink SQL, read options can be passed in via SQL hints like this:
+
+```
+SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
+...
+```
+
+Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.
+
+```
+env.getConfig()
+ .getConfiguration()
+ .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
+...
+```
+
+Check out all the options here: [read-options](/flink-configuration#read-options)
+
+## Inspecting tables
+
+To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables.
+
+Metadata tables are identified by adding the metadata table name after the original table name. For example, history for `db.table` is read using `db.table$history`.
+
+### History
+
+To show table history:
+
+```sql
+SELECT * FROM prod.db.table$history;
+```
+
+| made_current_at | snapshot_id | parent_id | is_current_ancestor |
+| ----------------------- | ------------------- | ------------------- | ------------------- |
+| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL | true |
+| 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true |
+| 2019-02-09 16:24:30.13 | 296410040247533544 | 5179299526185056830 | false |
+| 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true |
+| 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true |
+| 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true |
+
+{{< hint info >}}
+**This shows a commit that was rolled back.** In this example, snapshot 296410040247533544 and 2999875608062437330 have the same parent snapshot 5179299526185056830. Snapshot 296410040247533544 was rolled back and is *not* an ancestor of the current table state.
+{{< /hint >}}
+
+### Metadata Log Entries
+
+To show table metadata log entries:
+
+```sql
+SELECT * from prod.db.table$metadata_log_entries;
+```
+
+| timestamp | file | latest_snapshot_id | latest_schema_id | latest_sequence_number |
+| ----------------------- | ------------------------------------------------------------ | ------------------ | ---------------- | ---------------------- |
+| 2022-07-28 10:43:52.93 | s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.json | null | null | null |
+| 2022-07-28 10:43:57.487 | s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json | 170260833677645300 | 0 | 1 |
+| 2022-07-28 10:43:58.25 | s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json | 958906493976709774 | 0 | 2 |
+
+### Snapshots
+
+To show the valid snapshots for a table:
+
+```sql
+SELECT * FROM prod.db.table$snapshots;
+```
+
+| committed_at | snapshot_id | parent_id | operation | manifest_list | summary |
+| ----------------------- | -------------- | --------- | --------- | -------------------------------------------------- | ------------------------------------------------------------ |
+| 2019-02-08 03:29:51.215 | 57897183625154 | null | append | s3://.../table/metadata/snap-57897183625154-1.avro | { added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, flink.job-id -> 2e274eecb503d85369fb390e8956c813 } |
+
+You can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:
+
+```sql
+select
+ h.made_current_at,
+ s.operation,
+ h.snapshot_id,
+ h.is_current_ancestor,
+ s.summary['flink.job-id']
+from prod.db.table$history h
+join prod.db.table$snapshots s
+ on h.snapshot_id = s.snapshot_id
+order by made_current_at
+```
+
+| made_current_at | operation | snapshot_id | is_current_ancestor | summary[flink.job-id] |
+| ----------------------- | --------- | -------------- | ------------------- | -------------------------------- |
+| 2019-02-08 03:29:51.215 | append | 57897183625154 | true | 2e274eecb503d85369fb390e8956c813 |
+
+### Files
+
+To show a table's current data files:
+
+```sql
+SELECT * FROM prod.db.table$files;
+```
+
+| content | file_path | file_format | spec_id | partition | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | sort_order_id |
+| ------- | ------------------------------------------------------------ | ----------- | ------- | ---------------- | ------------ | ------------------ | ------------------ | ---------------- | ----------------- | ---------------- | --------------- | --------------- | ------------ | ------------- | ------------ | ------------- |
+| 0 | s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 01} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> c] | [1 -> , 2 -> c] | null | [4] | null | null |
+| 0 | s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 02} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> b] | [1 -> , 2 -> b] | null | [4] | null | null |
+| 0 | s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 03} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> a] | [1 -> , 2 -> a] | null | [4] | null | null |
+
+### Manifests
+
+To show a table's current file manifests:
+
+```sql
+SELECT * FROM prod.db.table$manifests;
+```
+
+| path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partition_summaries |
+| ------------------------------------------------------------ | ------ | ----------------- | ------------------- | ---------------------- | ------------------------- | ------------------------ | ------------------------------------ |
+| s3://.../table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro | 4479 | 0 | 6668963634911763636 | 8 | 0 | 0 | [[false,null,2019-05-13,2019-05-15]] |
+
+Note:
+
+1. Fields within `partition_summaries` column of the manifests table correspond to `field_summary` structs within [manifest list](../../../spec#manifest-lists), with the following order:
+ - `contains_null`
+ - `contains_nan`
+ - `lower_bound`
+ - `upper_bound`
+2. `contains_nan` could return null, which indicates that this information is not available from the file's metadata.
+ This usually occurs when reading from V1 table, where `contains_nan` is not populated.
+
+### Partitions
+
+To show a table's current partitions:
+
+```sql
+SELECT * FROM prod.db.table$partitions;
+```
+
+| partition | record_count | file_count | spec_id |
+| -------------- | ------------ | ---------- | ------- |
+| {20211001, 11} | 1 | 1 | 0 |
+| {20211002, 11} | 1 | 1 | 0 |
+| {20211001, 10} | 1 | 1 | 0 |
+| {20211002, 10} | 1 | 1 | 0 |
+
+Note:
+For unpartitioned tables, the partitions table will contain only the record_count and file_count columns.
+
+### All Metadata Tables
+
+These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots.
+
+{{< hint danger >}}
+The "all" metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot.
+{{< /hint >}}
+
+#### All Data Files
+
+To show all of the table's data files and each file's metadata:
+
+```sql
+SELECT * FROM prod.db.table$all_data_files;
+```
+
+| content | file_path | file_format | partition | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | sort_order_id |
+| ------- | ------------------------------------------------------------ | ----------- | ---------- | ------------ | ------------------ | ------------------ | ------------------ | ----------------- | ---------------- | ----------------------- | ----------------------- | ------------ | ------------- | ------------ | ------------- |
+| 0 | s3://.../dt=20210102/00000-0-756e2512-49ae-45bb-aae3-c0ca475e7879-00001.parquet | PARQUET | {20210102} | 14 | 2444 | {1 -> 94, 2 -> 17} | {1 -> 14, 2 -> 14} | {1 -> 0, 2 -> 0} | {} | {1 -> 1, 2 -> 20210102} | {1 -> 2, 2 -> 20210102} | null | [4] | null | 0 |
+| 0 | s3://.../dt=20210103/00000-0-26222098-032f-472b-8ea5-651a55b21210-00001.parquet | PARQUET | {20210103} | 14 | 2444 | {1 -> 94, 2 -> 17} | {1 -> 14, 2 -> 14} | {1 -> 0, 2 -> 0} | {} | {1 -> 1, 2 -> 20210103} | {1 -> 3, 2 -> 20210103} | null | [4] | null | 0 |
+| 0 | s3://.../dt=20210104/00000-0-a3bb1927-88eb-4f1c-bc6e-19076b0d952e-00001.parquet | PARQUET | {20210104} | 14 | 2444 | {1 -> 94, 2 -> 17} | {1 -> 14, 2 -> 14} | {1 -> 0, 2 -> 0} | {} | {1 -> 1, 2 -> 20210104} | {1 -> 3, 2 -> 20210104} | null | [4] | null | 0 |
+
+#### All Manifests
+
+To show all of the table's manifest files:
+
+```sql
+SELECT * FROM prod.db.table$all_manifests;
+```
+
+| path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partition_summaries |
+| ------------------------------------------------------------ | ------ | ----------------- | ------------------- | ---------------------- | ------------------------- | ------------------------ | ------------------------------------ |
+| s3://.../metadata/a85f78c5-3222-4b37-b7e4-faf944425d48-m0.avro | 6376 | 0 | 6272782676904868561 | 2 | 0 | 0 | [{false, false, 20210101, 20210101}] |
+
+Note:
+
+1. Fields within `partition_summaries` column of the manifests table correspond to `field_summary` structs within [manifest list](../../../spec#manifest-lists), with the following order:
+ - `contains_null`
+ - `contains_nan`
+ - `lower_bound`
+ - `upper_bound`
+2. `contains_nan` could return null, which indicates that this information is not available from the file's metadata.
+ This usually occurs when reading from V1 table, where `contains_nan` is not populated.
+
+### References
+
+To show a table's known snapshot references:
+
+```sql
+SELECT * FROM prod.db.table$refs;
+```
+
+| name | type | snapshot_id | max_reference_age_in_ms | min_snapshots_to_keep | max_snapshot_age_in_ms |
+| ------- | ------ | ------------------- | ----------------------- | --------------------- | ---------------------- |
+| main | BRANCH | 4686954189838128572 | 10 | 20 | 30 |
+| testTag | TAG | 4686954189838128572 | 10 | null | null |
+
diff --git a/docs/content/flink-writes.md b/docs/content/flink-writes.md
new file mode 100644
index 00000000..22cf0778
--- /dev/null
+++ b/docs/content/flink-writes.md
@@ -0,0 +1,262 @@
+---
+title: "Flink Writes"
+url: flink-writes
+aliases:
+ - "flink/flink-writes"
+menu:
+ main:
+ parent: Flink
+ weight: 400
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements. See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+# Flink Writes
+
+Iceberg support batch and streaming writes With [Apache Flink](https://flink.apache.org/)'s DataStream API and Table API.
+
+## Writing with SQL
+
+Iceberg support both `INSERT INTO` and `INSERT OVERWRITE`.
+
+### `INSERT INTO`
+
+To append new data to a table with a Flink streaming job, use `INSERT INTO`:
+
+```sql
+INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
+INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table;
+```
+
+### `INSERT OVERWRITE`
+
+To replace data in the table with the result of a query, use `INSERT OVERWRITE` in batch job (flink streaming job does not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables.
+
+Partitions that have rows produced by the SELECT query will be replaced, for example:
+
+```sql
+INSERT OVERWRITE sample VALUES (1, 'a');
+```
+
+Iceberg also support overwriting given partitions by the `select` values:
+
+```sql
+INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
+```
+
+For a partitioned iceberg table, when all the partition columns are set a value in `PARTITION` clause, it is inserting into a static partition, otherwise if partial partition columns (prefix part of all partition columns) are set a value in `PARTITION` clause, it is writing the query result into a dynamic partition.
+For an unpartitioned iceberg table, its data will be completely overwritten by `INSERT OVERWRITE`.
+
+### `UPSERT`
+
+Iceberg supports `UPSERT` based on the primary key when writing data into v2 table format. There are two ways to enable upsert.
+
+1. Enable the `UPSERT` mode as table-level property `write.upsert.enabled`. Here is an example SQL statement to set the table property when creating a table. It would be applied for all write paths to this table (batch or streaming) unless overwritten by write options as described later.
+
+```sql
+CREATE TABLE `hive_catalog`.`default`.`sample` (
+ `id` INT UNIQUE COMMENT 'unique id',
+ `data` STRING NOT NULL,
+ PRIMARY KEY(`id`) NOT ENFORCED
+) with ('format-version'='2', 'write.upsert.enabled'='true');
+```
+
+2. Enabling `UPSERT` mode using `upsert-enabled` in the [write options](#Write-options) provides more flexibility than a table level config. Note that you still need to use v2 table format and specify the primary key when creating the table.
+
+```sql
+INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
+...
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+
+
+## Writing with DataStream
+
+Iceberg support writing to iceberg table from different DataStream input.
+
+
+### Appending data.
+
+Flink supports writing `DataStream<RowData>` and `DataStream<Row>` to the sink iceberg table natively.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+ .tableLoader(tableLoader)
+ .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+The iceberg API also allows users to write generic `DataStream<T>` to iceberg table, more example could be found in this [unit test](https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java).
+
+### Overwrite data
+
+Set the `overwrite` flag in FlinkSink builder to overwrite the data in existing iceberg tables:
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+ .tableLoader(tableLoader)
+ .overwrite(true)
+ .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+### Upsert data
+
+Set the `upsert` flag in FlinkSink builder to upsert the data in existing iceberg table. The table must use v2 table format and have a primary key.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+ .tableLoader(tableLoader)
+ .upsert(true)
+ .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+### Write with Avro GenericRecord
+
+Flink Iceberg sink provides `AvroGenericRecordToRowDataMapper` that converts
+Avro `GenericRecord` to Flink `RowData`. You can use the mapper to write
+Avro GenericRecord DataStream to Iceberg.
+
+Please make sure `flink-avro` jar is included in the classpath.
+Also `iceberg-flink-runtime` shaded bundle jar can't be used
+because the runtime jar shades the avro package.
+Please use non-shaded `iceberg-flink` jar instead.
+
+```java
+DataStream<org.apache.avro.generic.GenericRecord> dataStream = ...;
+
+Schema icebergSchema = table.schema();
+
+
+// The Avro schema converted from Iceberg schema can't be used
+// due to precision difference between how Iceberg schema (micro)
+// and Flink AvroToRowDataConverters (milli) deal with time type.
+// Instead, use the Avro schema defined directly.
+// See AvroGenericRecordToRowDataMapper Javadoc for more details.
+org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, table.name());
+
+GenericRecordAvroTypeInfo avroTypeInfo = new GenericRecordAvroTypeInfo(avroSchema);
+RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+
+FlinkSink.builderFor(
+ dataStream,
+ AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema),
+ FlinkCompatibilityUtil.toTypeInfo(rowType))
+ .table(table)
+ .tableLoader(tableLoader)
+ .append();
+```
+
+### Metrics
+
+The following Flink metrics are provided by the Flink Iceberg sink.
+
+Parallel writer metrics are added under the sub group of `IcebergStreamWriter`.
+They should have the following key-value tags.
+
+* table: full table name (like iceberg.my_db.my_table)
+* subtask_index: writer subtask index starting from 0
+
+ Metric name | Metric type | Description |
+| ------------------------- |------------|-----------------------------------------------------------------------------------------------------|
+| lastFlushDurationMs | Gague | The duration (in milli) that writer subtasks take to flush and upload the files during checkpoint. |
+| flushedDataFiles | Counter | Number of data files flushed and uploaded. |
+| flushedDeleteFiles | Counter | Number of delete files flushed and uploaded. |
+| flushedReferencedDataFiles| Counter | Number of data files referenced by the flushed delete files. |
+| dataFilesSizeHistogram | Histogram | Histogram distribution of data file sizes (in bytes). |
+| deleteFilesSizeHistogram | Histogram | Histogram distribution of delete file sizes (in bytes). |
+
+Committer metrics are added under the sub group of `IcebergFilesCommitter`.
+They should have the following key-value tags.
+
+* table: full table name (like iceberg.my_db.my_table)
+
+ Metric name | Metric type | Description |
+|---------------------------------|--------|----------------------------------------------------------------------------|
+| lastCheckpointDurationMs | Gague | The duration (in milli) that the committer operator checkpoints its state. |
+| lastCommitDurationMs | Gague | The duration (in milli) that the Iceberg table commit takes. |
+| committedDataFilesCount | Counter | Number of data files committed. |
+| committedDataFilesRecordCount | Counter | Number of records contained in the committed data files. |
+| committedDataFilesByteCount | Counter | Number of bytes contained in the committed data files. |
+| committedDeleteFilesCount | Counter | Number of delete files committed. |
+| committedDeleteFilesRecordCount | Counter | Number of records contained in the committed delete files. |
+| committedDeleteFilesByteCount | Counter | Number of bytes contained in the committed delete files. |
+| elapsedSecondsSinceLastSuccessfulCommit| Gague | Elapsed time (in seconds) since last successful Iceberg commit. |
+
+`elapsedSecondsSinceLastSuccessfulCommit` is an ideal alerting metric
+to detect failed or missing Iceberg commits.
+
+* Iceberg commit happened after successful Flink checkpoint in the `notifyCheckpointComplete` callback.
+ It could happen that Iceberg commits failed (for whatever reason), while Flink checkpoints succeeding.
+* It could also happen that `notifyCheckpointComplete` wasn't triggered (for whatever bug).
+ As a result, there won't be any Iceberg commits attempted.
+
+If the checkpoint interval (and expected Iceberg commit interval) is 5 minutes, set up alert with rule like `elapsedSecondsSinceLastSuccessfulCommit > 60 minutes` to detect failed or missing Iceberg commits in the past hour.
+
+
+
+## Options
+
+### Write options
+
+Flink write options are passed when configuring the FlinkSink, like this:
+
+```java
+FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .set("write-format", "orc")
+ .set(FlinkWriteOptions.OVERWRITE_MODE, "true");
+```
+
+For Flink SQL, write options can be passed in via SQL hints like this:
+
+```sql
+INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
+...
+```
+
+Check out all the options here: [write-options](/flink-configuration#write-options)
\ No newline at end of file
diff --git a/docs/content/spark-configuration.md b/docs/content/spark-configuration.md
index d2267d82..70c415db 100644
--- a/docs/content/spark-configuration.md
+++ b/docs/content/spark-configuration.md
@@ -65,7 +65,7 @@ Both catalogs are configured using properties nested under the catalog name. Com
| Property | Values | Description |
| -------------------------------------------------- | ----------------------------- | -------------------------------------------------------------------- |
-| spark.sql.catalog._catalog-name_.type | `hive` or `hadoop` | The underlying Iceberg catalog implementation, `HiveCatalog`, `HadoopCatalog` or left unset if using a custom catalog |
+| spark.sql.catalog._catalog-name_.type | `hive`, `hadoop` or `rest` | The underlying Iceberg catalog implementation, `HiveCatalog`, `HadoopCatalog`, `RESTCatalog` or left unset if using a custom catalog |
| spark.sql.catalog._catalog-name_.catalog-impl | | The underlying Iceberg catalog implementation.|
| spark.sql.catalog._catalog-name_.default-namespace | default | The default current namespace for the catalog |
| spark.sql.catalog._catalog-name_.uri | thrift://host:port | Metastore connect URI; default from `hive-site.xml` |
diff --git a/docs/content/spark-procedures.md b/docs/content/spark-procedures.md
index c7343a3d..382606d6 100644
--- a/docs/content/spark-procedures.md
+++ b/docs/content/spark-procedures.md
@@ -587,3 +587,119 @@ Get all the snapshot ancestors by a particular snapshot
CALL spark_catalog.system.ancestors_of('db.tbl', 1)
CALL spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl')
```
+
+## Change Data Capture
+
+### `create_changelog_view`
+
+Creates a view that contains the changes from a given table.
+
+#### Usage
+
+| Argument Name | Required? | Type | Description |
+|---------------|----------|------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `table` | ✔️ | string | Name of the source table for the changelog |
+| `changelog_view` | | string | Name of the view to create |
+| `options` | | map<string, string> | A map of Spark read options to use |
+|`compute_updates`| | boolean | Whether to compute pre/post update images (see below for more information). Defaults to false. |
+|`identifier_columns`| | array<string> | The list of identifier columns to compute updates. If the argument `compute_updates` is set to true and `identifier_columns` are not provided, the table’s current identifier fields will be used to compute updates. |
+|`remove_carryovers`| | boolean | Whether to remove carry-over rows (see below for more information). Defaults to true. |
+
+Here is a list of commonly used Spark read options:
+* `start-snapshot-id`: the exclusive start snapshot ID. If not provided, it reads from the table’s first snapshot inclusively.
+* `end-snapshot-id`: the inclusive end snapshot id, default to table's current snapshot.
+* `start-timestamp`: the exclusive start timestamp. If not provided, it reads from the table’s first snapshot inclusively.
+* `end-timestamp`: the inclusive end timestamp, default to table's current snapshot.
+
+#### Output
+| Output Name | Type | Description |
+| ------------|------|----------------------------------------|
+| `changelog_view` | string | The name of the created changelog view |
+
+#### Examples
+
+Create a changelog view `tbl_changes` based on the changes that happened between snapshot `1` (exclusive) and `2` (inclusive).
+```sql
+CALL spark_catalog.system.create_changelog_view(
+ table => 'db.tbl',
+ options => map('start-snapshot-id','1','end-snapshot-id', '2')
+)
+```
+
+Create a changelog view `my_changelog_view` based on the changes that happened between timestamp `1678335750489` (exclusive) and `1678992105265` (inclusive).
+```sql
+CALL spark_catalog.system.create_changelog_view(
+ table => 'db.tbl',
+ options => map('start-timestamp','1678335750489','end-timestamp', '1678992105265'),
+ changelog_view => 'my_changelog_view'
+)
+```
+
+Create a changelog view that computes updates based on the identifier columns `id` and `name`.
+```sql
+CALL spark_catalog.system.create_changelog_view(
+ table => 'db.tbl',
+ options => map('start-snapshot-id','1','end-snapshot-id', '2'),
+ identifier_columns => array('id', 'name')
+)
+```
+
+Once the changelog view is created, you can query the view to see the changes that happened between the snapshots.
+```sql
+SELECT * FROM tbl_changes
+```
+```sql
+SELECT * FROM tbl_changes where _change_type = 'INSERT' AND id = 3 ORDER BY _change_ordinal
+```
+Please note that the changelog view includes Change Data Capture(CDC) metadata columns
+that provide additional information about the changes being tracked. These columns are:
+- `_change_type`: the type of change. It has one of the following values: `INSERT`, `DELETE`, `UPDATE_BEFORE`, or `UPDATE_AFTER`.
+- `_change_ordinal`: the order of changes
+- `_commit_snapshot_id`: the snapshot ID where the change occurred
+
+Here is an example of corresponding results. It shows that the first snapshot inserted 2 records, and the
+second snapshot deleted 1 record.
+
+| id | name |_change_type | _change_ordinal | _change_snapshot_id |
+|---|--------|---|---|---|
+|1 | Alice |INSERT |0 |5390529835796506035|
+|2 | Bob |INSERT |0 |5390529835796506035|
+|1 | Alice |DELETE |1 |8764748981452218370|
+
+#### Carry-over Rows
+
+The procedure removes the carry-over rows by default. Carry-over rows are the result of row-level operations(`MERGE`, `UPDATE` and `DELETE`)
+when using copy-on-write. For example, given a file which contains row1 `(id=1, name='Alice')` and row2 `(id=2, name='Bob')`.
+A copy-on-write delete of row2 would require erasing this file and preserving row1 in a new file. The changelog table
+reports this as the following pair of rows, despite it not being an actual change to the table.
+
+| id | name | _change_type |
+|-----|-------|--------------|
+| 1 | Alice | DELETE |
+| 1 | Alice | INSERT |
+
+By default, this view finds the carry-over rows and removes them from the result. User can disable this
+behavior by setting the `remove_carryovers` option to `false`.
+
+#### Pre/Post Update Images
+
+The procedure computes the pre/post update images if configured. Pre/post update images are converted from a
+pair of a delete row and an insert row. Identifier columns are used for determining whether an insert and a delete record
+refer to the same row. If the two records share the same values for the identity columns they are considered to be before
+and after states of the same row. You can either set identifier fields in the table schema or input them as the procedure parameters.
+
+The following example shows pre/post update images computation with an identifier column(`id`), where a row deletion
+and an insertion with the same `id` are treated as a single update operation. Specifically, suppose we have the following pair of rows:
+
+| id | name | _change_type |
+|-----|--------|--------------|
+| 3 | Robert | DELETE |
+| 3 | Dan | INSERT |
+
+In this case, the procedure marks the row before the update as an `UPDATE_BEFORE` image and the row after the update
+as an `UPDATE_AFTER` image, resulting in the following pre/post update images:
+
+| id | name | _change_type |
+|-----|--------|--------------|
+| 3 | Robert | UPDATE_BEFORE|
+| 3 | Dan | UPDATE_AFTER |
\ No newline at end of file