You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by gi...@apache.org on 2023/04/12 15:16:05 UTC

[iceberg-docs] branch asf-site updated: deploy: 253d32e7506868583f5f34d32df59de55d8d14e2

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/iceberg-docs.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 7e72135a deploy: 253d32e7506868583f5f34d32df59de55d8d14e2
7e72135a is described below

commit 7e72135a3ded5c6c3d177d7eeb8543ac6d900bf7
Author: danielcweeks <da...@users.noreply.github.com>
AuthorDate: Wed Apr 12 15:15:59 2023 +0000

    deploy: 253d32e7506868583f5f34d32df59de55d8d14e2
---
 docs/1.2.1/api/index.html                        |   2 +-
 docs/1.2.1/aws/index.html                        |  11 +-
 docs/1.2.1/configuration/index.html              |   2 +-
 docs/1.2.1/custom-catalog/index.html             |   2 +-
 docs/1.2.1/dell/index.html                       |   2 +-
 docs/1.2.1/evolution/index.html                  |   2 +-
 docs/1.2.1/{schemas => flink-actions}/index.html |  19 +-
 docs/1.2.1/flink-configuration/index.html        |  46 ++++
 docs/1.2.1/flink-connector/index.html            |   2 +-
 docs/1.2.1/flink-ddl/index.html                  |  78 ++++++
 docs/1.2.1/flink-queries/index.html              | 176 +++++++++++++
 docs/1.2.1/flink-writes/index.html               | 106 ++++++++
 docs/1.2.1/flink/flink-actions/index.html        |   1 +
 docs/1.2.1/flink/flink-configuration/index.html  |   1 +
 docs/1.2.1/flink/flink-ddl/index.html            |   1 +
 docs/1.2.1/flink/flink-queries/index.html        |   1 +
 docs/1.2.1/flink/flink-writes/index.html         |   1 +
 docs/1.2.1/flink/flink/index.html                |   1 +
 docs/1.2.1/flink/index.html                      | 312 ++---------------------
 docs/1.2.1/getting-started/index.html            |   2 +-
 docs/1.2.1/hive/index.html                       |   2 +-
 docs/1.2.1/index.html                            |   2 +-
 docs/1.2.1/index.xml                             |  18 +-
 docs/1.2.1/java-api-quickstart/index.html        |   2 +-
 docs/1.2.1/jdbc/index.html                       |   2 +-
 docs/1.2.1/maintenance/index.html                |   2 +-
 docs/1.2.1/nessie/index.html                     |   2 +-
 docs/1.2.1/partitioning/index.html               |   2 +-
 docs/1.2.1/performance/index.html                |   2 +-
 docs/1.2.1/reliability/index.html                |   2 +-
 docs/1.2.1/schemas/index.html                    |   2 +-
 docs/1.2.1/sitemap.xml                           |   2 +-
 docs/1.2.1/spark-configuration/index.html        |   4 +-
 docs/1.2.1/spark-ddl/index.html                  |   2 +-
 docs/1.2.1/spark-procedures/index.html           |  31 ++-
 docs/1.2.1/spark-queries/index.html              |   2 +-
 docs/1.2.1/spark-structured-streaming/index.html |   2 +-
 docs/1.2.1/spark-writes/index.html               |   2 +-
 38 files changed, 521 insertions(+), 330 deletions(-)

diff --git a/docs/1.2.1/api/index.html b/docs/1.2.1/api/index.html
index f7050ea9..2e37f0b4 100644
--- a/docs/1.2.1/api/index.html
+++ b/docs/1.2.1/api/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class=chevron-toggle data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/aws/index.html b/docs/1.2.1/aws/index.html
index eddfaf79..78800295 100644
--- a/docs/1.2.1/aws/index.html
+++ b/docs/1.2.1/aws/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class="collapse in"><ul class=sub-menu><li><a id=active href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
@@ -33,7 +33,6 @@ Here are some examples.</p><h3 id=spark>Spark</h3><p>For example, to use AWS fea
 </span></span><span style=display:flex><span>AWS_MAVEN_GROUP<span style=color:#f92672>=</span>software.amazon.awssdk
 </span></span><span style=display:flex><span>AWS_PACKAGES<span style=color:#f92672>=(</span>
 </span></span><span style=display:flex><span>    <span style=color:#e6db74>&#34;bundle&#34;</span>
-</span></span><span style=display:flex><span>    <span style=color:#e6db74>&#34;url-connection-client&#34;</span>
 </span></span><span style=display:flex><span><span style=color:#f92672>)</span>
 </span></span><span style=display:flex><span><span style=color:#66d9ef>for</span> pkg in <span style=color:#e6db74>&#34;</span><span style=color:#e6db74>${</span>AWS_PACKAGES[@]<span style=color:#e6db74>}</span><span style=color:#e6db74>&#34;</span>; <span style=color:#66d9ef>do</span>
 </span></span><span style=display:flex><span>    DEPENDENCIES<span style=color:#f92672>+=</span><span style=color:#e6db74>&#34;,</span>$AWS_MAVEN_GROUP<span style=color:#e6db74>:</span>$pkg<span style=color:#e6db74>:</span>$AWS_SDK_VERSION<span style=color:#e6db74>&#34;</span>
@@ -57,7 +56,6 @@ Here are some examples.</p><h3 id=spark>Spark</h3><p>For example, to use AWS fea
 </span></span><span style=display:flex><span>AWS_MAVEN_URL<span style=color:#f92672>=</span>$MAVEN_URL/software/amazon/awssdk
 </span></span><span style=display:flex><span>AWS_PACKAGES<span style=color:#f92672>=(</span>
 </span></span><span style=display:flex><span>    <span style=color:#e6db74>&#34;bundle&#34;</span>
-</span></span><span style=display:flex><span>    <span style=color:#e6db74>&#34;url-connection-client&#34;</span>
 </span></span><span style=display:flex><span><span style=color:#f92672>)</span>
 </span></span><span style=display:flex><span><span style=color:#66d9ef>for</span> pkg in <span style=color:#e6db74>&#34;</span><span style=color:#e6db74>${</span>AWS_PACKAGES[@]<span style=color:#e6db74>}</span><span style=color:#e6db74>&#34;</span>; <span style=color:#66d9ef>do</span>
 </span></span><span style=display:flex><span>    wget $AWS_MAVEN_URL/$pkg/$AWS_SDK_VERSION/$pkg-$AWS_SDK_VERSION.jar
@@ -67,7 +65,6 @@ Here are some examples.</p><h3 id=spark>Spark</h3><p>For example, to use AWS fea
 </span></span><span style=display:flex><span>/path/to/bin/sql-client.sh embedded <span style=color:#ae81ff>\
 </span></span></span><span style=display:flex><span><span style=color:#ae81ff></span>    -j iceberg-flink-runtime-$ICEBERG_VERSION.jar <span style=color:#ae81ff>\
 </span></span></span><span style=display:flex><span><span style=color:#ae81ff></span>    -j bundle-$AWS_SDK_VERSION.jar <span style=color:#ae81ff>\
-</span></span></span><span style=display:flex><span><span style=color:#ae81ff></span>    -j url-connection-client-$AWS_SDK_VERSION.jar <span style=color:#ae81ff>\
 </span></span></span><span style=display:flex><span><span style=color:#ae81ff></span>    shell
 </span></span></code></pre></div><p>With those dependencies, you can create a Flink catalog like the following:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>CATALOG</span> my_catalog <span style=color:#66d9ef>WITH</span> (
 </span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
@@ -84,7 +81,6 @@ Here are some examples.</p><h3 id=spark>Spark</h3><p>For example, to use AWS fea
 </span></span></code></pre></div><h3 id=hive>Hive</h3><p>To use AWS module with Hive, you can download the necessary dependencies similar to the Flink example,
 and then add them to the Hive classpath or add the jars at runtime in CLI:</p><pre tabindex=0><code>add jar /my/path/to/iceberg-hive-runtime.jar;
 add jar /my/path/to/aws/bundle.jar;
-add jar /my/path/to/aws/url-connection-client.jar;
 </code></pre><p>With those dependencies, you can register a Glue catalog and create external tables in Hive at runtime in CLI by:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>SET</span> iceberg.engine.hive.enabled<span style=color:#f92672>=</span><span style=color:#66d9ef>true</span>;
 </span></span><span style=display:flex><span><span style=color:#66d9ef>SET</span> hive.vectorized.execution.enabled<span style=color:#f92672>=</span><span style=color:#66d9ef>false</span>;
 </span></span><span style=display:flex><span><span style=color:#66d9ef>SET</span> iceberg.<span style=color:#66d9ef>catalog</span>.glue.<span style=color:#66d9ef>catalog</span><span style=color:#f92672>-</span>impl<span style=color:#f92672>=</span>org.apache.iceberg.aws.glue.GlueCatalog;
@@ -116,7 +112,7 @@ This is only added for users that have existing conventions using non-standard c
 and table name validation are skipped, there is no guarantee that downstream systems would all support the names.</p><h4 id=optimistic-locking>Optimistic Locking</h4><p>By default, Iceberg uses Glue&rsquo;s optimistic locking for concurrent updates to a table.
 With optimistic locking, each table has a version id.
 If users retrieve the table metadata, Iceberg records the version id of that table.
-Users can update the table, but only if the version id on the server side has not changed.
+Users can update the table as long as the version ID on the server side remains unchanged.
 If there is a version mismatch, it means that someone else has modified the table before you did.
 The update attempt fails, because you have a stale version of the table.
 If this happens, Iceberg refreshes the metadata and checks if there might be potential conflict.
@@ -245,7 +241,7 @@ Here is an example to start Spark shell with this client factory:</p><div class=
 and <a href=https://mvnrepository.com/artifact/software.amazon.awssdk/apache-client>Apache HTTP Client</a>.
 By default, AWS clients use <strong>URL Connection</strong> HTTP Client to communicate with the service.
 This HTTP client optimizes for minimum dependencies and startup latency but support less functionality than other implementations.
-In contrast, Apache HTTP Client supports more functionalities and more customized settings, such as expect-continue handshake and TCP KeepAlive, at cost of extra dependency and additional startup latency.</p><p>For more details of configuration, see sections <a href=#url-connection-http-client-configurations>URL Connection HTTP Client Configurations</a> and <a href=#apache-http-client-configurations>Apache HTTP Client Configurations</a>.</p><p>Configure the following property to set the  [...]
+In contrast, Apache HTTP Client supports more functionalities and more customized settings, such as expect-continue handshake and TCP KeepAlive, at cost of extra dependency and additional startup latency.</p><p>For more details of configuration, see sections <a href=#url-connection-http-client-configurations>URL Connection HTTP Client Configurations</a> and <a href=#apache-http-client-configurations>Apache HTTP Client Configurations</a>.</p><p>Configure the following property to set the  [...]
 </span></span></code></pre></div><h4 id=apache-http-client-configurations>Apache HTTP Client Configurations</h4><p>Apache HTTP Client has the following configurable properties:</p><table><thead><tr><th>Property</th><th>Default</th><th>Description</th></tr></thead><tbody><tr><td>http-client.apache.socket-timeout-ms</td><td>null</td><td>An optional <a href=https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html#socketTimeout(java.time.Dura [...]
 </span></span></code></pre></div><h2 id=run-iceberg-on-aws>Run Iceberg on AWS</h2><h3 id=amazon-athena>Amazon Athena</h3><p><a href=https://aws.amazon.com/athena/>Amazon Athena</a> provides a serverless query engine that could be used to perform read, write, update and optimization tasks against Iceberg tables.
 More details could be found <a href=https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html>here</a>.</p><h3 id=amazon-emr>Amazon EMR</h3><p><a href=https://aws.amazon.com/emr/>Amazon EMR</a> can provision clusters with <a href=https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark.html>Spark</a> (EMR 6 for Spark 3, EMR 5 for Spark 2),
@@ -264,7 +260,6 @@ Please refer to the <a href=https://docs.aws.amazon.com/emr/latest/ReleaseGuide/
 </span></span><span style=display:flex><span>
 </span></span><span style=display:flex><span>AWS_PACKAGES<span style=color:#f92672>=(</span>
 </span></span><span style=display:flex><span>  <span style=color:#e6db74>&#34;bundle&#34;</span>
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#34;url-connection-client&#34;</span>
 </span></span><span style=display:flex><span><span style=color:#f92672>)</span>
 </span></span><span style=display:flex><span>
 </span></span><span style=display:flex><span>ICEBERG_PACKAGES<span style=color:#f92672>=(</span>
diff --git a/docs/1.2.1/configuration/index.html b/docs/1.2.1/configuration/index.html
index 7518a20f..397533ce 100644
--- a/docs/1.2.1/configuration/index.html
+++ b/docs/1.2.1/configuration/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/custom-catalog/index.html b/docs/1.2.1/custom-catalog/index.html
index 73618d37..532f5169 100644
--- a/docs/1.2.1/custom-catalog/index.html
+++ b/docs/1.2.1/custom-catalog/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class=chevron-toggle data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/dell/index.html b/docs/1.2.1/dell/index.html
index e0f83706..3ba1ce87 100644
--- a/docs/1.2.1/dell/index.html
+++ b/docs/1.2.1/dell/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class="collapse in"><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a id=active href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/evolution/index.html b/docs/1.2.1/evolution/index.html
index 3bc767e2..11264acf 100644
--- a/docs/1.2.1/evolution/index.html
+++ b/docs/1.2.1/evolution/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/schemas/index.html b/docs/1.2.1/flink-actions/index.html
similarity index 57%
copy from docs/1.2.1/schemas/index.html
copy to docs/1.2.1/flink-actions/index.html
index 9200290c..77ac1850 100644
--- a/docs/1.2.1/schemas/index.html
+++ b/docs/1.2.1/flink-actions/index.html
@@ -1,19 +1,26 @@
-<!doctype html><html><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><meta name=description content><meta name=author content><title>Schemas</title><link href=../css/bootstrap.css rel=stylesheet><link href=../css/markdown.css rel=stylesheet><link href=../css/katex.min.css rel=stylesheet><link href=../css/iceberg-theme.css rel=stylesheet><link href=../font-awesome-4.7.0/css/font-awesome.min.css r [...]
+<!doctype html><html><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><meta name=description content><meta name=author content><title>Flink Actions</title><link href=../css/bootstrap.css rel=stylesheet><link href=../css/markdown.css rel=stylesheet><link href=../css/katex.min.css rel=stylesheet><link href=../css/iceberg-theme.css rel=stylesheet><link href=../font-awesome-4.7.0/css/font-awesome.min [...]
 <span class=sr-only>Toggle navigation</span>
 <span class=icon-bar></span>
 <span class=icon-bar></span>
 <span class=icon-bar></span></button>
-<a class="page-scroll navbar-brand" href=https://iceberg.apache.org/><img class=top-navbar-logo src=https://iceberg.apache.org/docs/1.2.1//img/iceberg-logo-icon.png> Apache Iceberg</a></div><div><input type=search class=form-control id=search-input placeholder=Search... maxlength=64 data-hotkeys=s/></div><div class=versions-dropdown><span>1.2.0</span> <i class="fa fa-chevron-down"></i><div class=versions-dropdown-content><ul><li class=versions-dropdown-selection><a href=https://iceberg.a [...]
+<a class="page-scroll navbar-brand" href=https://iceberg.apache.org/><img class=top-navbar-logo src=https://iceberg.apache.org/docs/1.2.1//img/iceberg-logo-icon.png> Apache Iceberg</a></div><div><input type=search class=form-control id=search-input placeholder=Search... maxlength=64 data-hotkeys=s/></div><div class=versions-dropdown><span>1.2.0</span> <i class="fa fa-chevron-down"></i><div class=versions-dropdown-content><ul><li class=versions-dropdown-selection><a href=https://iceberg.a [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Tables class="collapse in"><ul class=sub-menu><li><a href=../configuration/>Configuration</a></li><li><a href=../evolution/>Evolution</a></li><li><a href=../maintenance/>Maintenance</a></li><li><a href=../partitioning/>Partitioning</a></li><li><a href=../performance/>Performance</a></li><li><a href=../reliability/>Reliability</a></li><li><a id=active href=../schemas/>Schemas</a></li></ul></div><li><a class="chevron-toggle collapsed" data [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Tables class=collapse><ul class=sub-menu><li><a href=../configuration/>Configuration</a></li><li><a href=../evolution/>Evolution</a></li><li><a href=../maintenance/>Maintenance</a></li><li><a href=../partitioning/>Partitioning</a></li><li><a href=../performance/>Performance</a></li><li><a href=../reliability/>Reliability</a></li><li><a href=../schemas/>Schemas</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collaps [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class=chevron-toggle data-toggle=collapse data-parent=full href=#Flink><spa [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class="collapse in"><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a id=active href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li>< [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=API class=collapse><ul class=sub-menu><li><a href=../java-api-quickstart/>Java Quickstart</a></li><li><a href=../api/>Java API</a></li><li><a href=../custom-catalog/>Java Custom Catalog</a></li></ul></div><li><a href=https://iceberg.apache.org/docs/1.2.1/../../javadoc/latest><span>Javadoc</span></a></li><li><a target=_blank href=https://py.iceberg.apache.org/><span>PyIceberg</span></a></li></div></div><div id=content class=markdown-body> [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=API class=collapse><ul class=sub-menu><li><a href=../java-api-quickstart/>Java Quickstart</a></li><li><a href=../api/>Java API</a></li><li><a href=../custom-catalog/>Java Custom Catalog</a></li></ul></div><li><a href=https://iceberg.apache.org/docs/1.2.1/../../javadoc/latest><span>Javadoc</span></a></li><li><a target=_blank href=https://py.iceberg.apache.org/><span>PyIceberg</span></a></li></div></div><div id=content class=markdown-body> [...]
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>);</span>
+</span></span><span style=display:flex><span>Table table <span style=color:#f92672>=</span> tableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>loadTable</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>RewriteDataFilesActionResult result <span style=color:#f92672>=</span> Actions<span style=color:#f92672>.</span><span style=color:#a6e22e>forTable</span><span style=color:#f92672>(</span>table<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>        <span style=color:#f92672>.</span><span style=color:#a6e22e>rewriteDataFiles</span><span style=color:#f92672>()</span>
+</span></span><span style=display:flex><span>        <span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>();</span>
+</span></span></code></pre></div><p>For more details of the rewrite files action, please refer to <a href=../../../javadoc/1.2.0/org/apache/iceberg/flink/actions/RewriteDataFilesAction.html>RewriteDataFilesAction</a></p></div><div id=toc class=markdown-body><div id=full><nav id=TableOfContents><ul><li><a href=#rewrite-files-action>Rewrite files action.</a></li></ul></nav></div></div></div></div></section></body><script src=https://iceberg.apache.org/docs/1.2.1//js/jquery-1.11.0.js></script>
 <script src=https://iceberg.apache.org/docs/1.2.1//js/jquery.easing.min.js></script>
 <script type=text/javascript src=https://iceberg.apache.org/docs/1.2.1//js/search.js></script>
 <script src=https://iceberg.apache.org/docs/1.2.1//js/bootstrap.min.js></script>
diff --git a/docs/1.2.1/flink-configuration/index.html b/docs/1.2.1/flink-configuration/index.html
new file mode 100644
index 00000000..8148b9d5
--- /dev/null
+++ b/docs/1.2.1/flink-configuration/index.html
@@ -0,0 +1,46 @@
+<!doctype html><html><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><meta name=description content><meta name=author content><title>Flink Configuration</title><link href=../css/bootstrap.css rel=stylesheet><link href=../css/markdown.css rel=stylesheet><link href=../css/katex.min.css rel=stylesheet><link href=../css/iceberg-theme.css rel=stylesheet><link href=../font-awesome-4.7.0/css/font-aweso [...]
+<span class=sr-only>Toggle navigation</span>
+<span class=icon-bar></span>
+<span class=icon-bar></span>
+<span class=icon-bar></span></button>
+<a class="page-scroll navbar-brand" href=https://iceberg.apache.org/><img class=top-navbar-logo src=https://iceberg.apache.org/docs/1.2.1//img/iceberg-logo-icon.png> Apache Iceberg</a></div><div><input type=search class=form-control id=search-input placeholder=Search... maxlength=64 data-hotkeys=s/></div><div class=versions-dropdown><span>1.2.0</span> <i class="fa fa-chevron-down"></i><div class=versions-dropdown-content><ul><li class=versions-dropdown-selection><a href=https://iceberg.a [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Tables class=collapse><ul class=sub-menu><li><a href=../configuration/>Configuration</a></li><li><a href=../evolution/>Evolution</a></li><li><a href=../maintenance/>Maintenance</a></li><li><a href=../partitioning/>Partitioning</a></li><li><a href=../performance/>Performance</a></li><li><a href=../reliability/>Reliability</a></li><li><a href=../schemas/>Schemas</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collaps [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class=chevron-toggle data-toggle=collapse data-parent=full href=#Flink><spa [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class="collapse in"><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a id=active href=../flink-configuration/>Flink Configuration</a></li></ul></div><li>< [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=API class=collapse><ul class=sub-menu><li><a href=../java-api-quickstart/>Java Quickstart</a></li><li><a href=../api/>Java API</a></li><li><a href=../custom-catalog/>Java Custom Catalog</a></li></ul></div><li><a href=https://iceberg.apache.org/docs/1.2.1/../../javadoc/latest><span>Javadoc</span></a></li><li><a target=_blank href=https://py.iceberg.apache.org/><span>PyIceberg</span></a></li></div></div><div id=content class=markdown-body> [...]
+<code>&lt;config_key></code>=<code>&lt;config_value></code> with catalog implementation config):</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>CATALOG</span> <span style=color:#f92672>&lt;</span><span style=color:#66d9ef>catalog_name</span><span style=color:#f92672>&gt;</span [...]
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#f92672>`&lt;</span>config_key<span style=color:#f92672>&gt;`=`&lt;</span>config_value<span style=color:#f92672>&gt;`</span>
+</span></span><span style=display:flex><span>); 
+</span></span></code></pre></div><p>The following properties can be set globally and are not limited to a specific catalog implementation:</p><table><thead><tr><th>Property</th><th>Required</th><th>Values</th><th>Description</th></tr></thead><tbody><tr><td>type</td><td>✔️</td><td>iceberg</td><td>Must be <code>iceberg</code>.</td></tr><tr><td>catalog-type</td><td></td><td><code>hive</code>, <code>hadoop</code> or <code>rest</code></td><td><code>hive</code>, <code>hadoop</code> or <code>re [...]
+    .tableLoader(TableLoader.fromCatalog(...))
+    .assignerFactory(new SimpleSplitAssignerFactory())
+    .streaming(true)
+    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+    .startSnapshotId(3821550127947089987L)
+    .monitorInterval(Duration.ofMillis(10L)) // or .set(&#34;monitor-interval&#34;, &#34;10s&#34;) \ set(FlinkReadOptions.MONITOR_INTERVAL, &#34;10s&#34;)
+    .build()
+</code></pre><p>For Flink SQL, read options can be passed in via SQL hints like this:</p><pre tabindex=0><code>SELECT * FROM tableName /*+ OPTIONS(&#39;monitor-interval&#39;=&#39;10s&#39;) */
+...
+</code></pre><p>Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.</p><pre tabindex=0><code>env.getConfig()
+    .getConfiguration()
+    .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
+...
+</code></pre><p><code>Read option</code> has the highest priority, followed by <code>Flink configuration</code> and then <code>Table property</code>.</p><table><thead><tr><th>Read option</th><th>Flink configuration</th><th>Table property</th><th>Default</th><th>Description</th></tr></thead><tbody><tr><td>snapshot-id</td><td>N/A</td><td>N/A</td><td>null</td><td>For time travel in batch mode. Read data from the specified snapshot-id.</td></tr><tr><td>case-sensitive</td><td>connector.iceber [...]
+    .table(table)
+    .tableLoader(tableLoader)
+    .set(&#34;write-format&#34;, &#34;orc&#34;)
+    .set(FlinkWriteOptions.OVERWRITE_MODE, &#34;true&#34;);
+</code></pre><p>For Flink SQL, write options can be passed in via SQL hints like this:</p><pre tabindex=0><code>INSERT INTO tableName /*+ OPTIONS(&#39;upsert-enabled&#39;=&#39;true&#39;) */
+...
+</code></pre><table><thead><tr><th>Flink option</th><th>Default</th><th>Description</th></tr></thead><tbody><tr><td>write-format</td><td>Table write.format.default</td><td>File format to use for this write operation; parquet, avro, or orc</td></tr><tr><td>target-file-size-bytes</td><td>As per table property</td><td>Overrides this table&rsquo;s write.target-file-size-bytes</td></tr><tr><td>upsert-enabled</td><td>Table write.upsert.enabled</td><td>Overrides this table&rsquo;s write.upsert. [...]
+<script src=https://iceberg.apache.org/docs/1.2.1//js/jquery.easing.min.js></script>
+<script type=text/javascript src=https://iceberg.apache.org/docs/1.2.1//js/search.js></script>
+<script src=https://iceberg.apache.org/docs/1.2.1//js/bootstrap.min.js></script>
+<script src=https://iceberg.apache.org/docs/1.2.1//js/iceberg-theme.js></script></html>
\ No newline at end of file
diff --git a/docs/1.2.1/flink-connector/index.html b/docs/1.2.1/flink-connector/index.html
index 73c3d62f..5948572e 100644
--- a/docs/1.2.1/flink-connector/index.html
+++ b/docs/1.2.1/flink-connector/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class=chevron-toggle data-toggle=collapse data-parent=full href=#Flink><spa [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class="collapse in"><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a id=active href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li> [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class="collapse in"><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a id=active href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li>< [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/flink-ddl/index.html b/docs/1.2.1/flink-ddl/index.html
new file mode 100644
index 00000000..9eaf006e
--- /dev/null
+++ b/docs/1.2.1/flink-ddl/index.html
@@ -0,0 +1,78 @@
+<!doctype html><html><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><meta name=description content><meta name=author content><title>Flink DDL</title><link href=../css/bootstrap.css rel=stylesheet><link href=../css/markdown.css rel=stylesheet><link href=../css/katex.min.css rel=stylesheet><link href=../css/iceberg-theme.css rel=stylesheet><link href=../font-awesome-4.7.0/css/font-awesome.min.css [...]
+<span class=sr-only>Toggle navigation</span>
+<span class=icon-bar></span>
+<span class=icon-bar></span>
+<span class=icon-bar></span></button>
+<a class="page-scroll navbar-brand" href=https://iceberg.apache.org/><img class=top-navbar-logo src=https://iceberg.apache.org/docs/1.2.1//img/iceberg-logo-icon.png> Apache Iceberg</a></div><div><input type=search class=form-control id=search-input placeholder=Search... maxlength=64 data-hotkeys=s/></div><div class=versions-dropdown><span>1.2.0</span> <i class="fa fa-chevron-down"></i><div class=versions-dropdown-content><ul><li class=versions-dropdown-selection><a href=https://iceberg.a [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Tables class=collapse><ul class=sub-menu><li><a href=../configuration/>Configuration</a></li><li><a href=../evolution/>Evolution</a></li><li><a href=../maintenance/>Maintenance</a></li><li><a href=../partitioning/>Partitioning</a></li><li><a href=../performance/>Performance</a></li><li><a href=../reliability/>Reliability</a></li><li><a href=../schemas/>Schemas</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collaps [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class=chevron-toggle data-toggle=collapse data-parent=full href=#Flink><spa [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class="collapse in"><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a id=active href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li>< [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=API class=collapse><ul class=sub-menu><li><a href=../java-api-quickstart/>Java Quickstart</a></li><li><a href=../api/>Java API</a></li><li><a href=../custom-catalog/>Java Custom Catalog</a></li></ul></div><li><a href=https://iceberg.apache.org/docs/1.2.1/../../javadoc/latest><span>Javadoc</span></a></li><li><a target=_blank href=https://py.iceberg.apache.org/><span>PyIceberg</span></a></li></div></div><div id=content class=markdown-body> [...]
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;catalog-type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hive&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;uri&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;thrift://localhost:9083&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;clients&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;5&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;property-version&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;1&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;warehouse&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hdfs://nn:8020/warehouse/path&#39;</span>
+</span></span><span style=display:flex><span>);
+</span></span></code></pre></div><p>The following properties can be set if using the Hive catalog:</p><ul><li><code>uri</code>: The Hive metastore&rsquo;s thrift URI. (Required)</li><li><code>clients</code>: The Hive metastore client pool size, default value is 2. (Optional)</li><li><code>warehouse</code>: The Hive warehouse location, users should specify this path if neither set the <code>hive-conf-dir</code> to specify a location containing a <code>hive-site.xml</code> configuration fi [...]
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;catalog-type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hadoop&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;warehouse&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hdfs://nn:8020/warehouse/path&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;property-version&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;1&#39;</span>
+</span></span><span style=display:flex><span>);
+</span></span></code></pre></div><p>The following properties can be set if using the Hadoop catalog:</p><ul><li><code>warehouse</code>: The HDFS directory to store metadata files and data files. (Required)</li></ul><p>Execute the sql command <code>USE CATALOG hadoop_catalog</code> to set the current catalog.</p><h4 id=rest-catalog>REST catalog</h4><p>This creates an iceberg catalog named <code>rest_catalog</code> that can be configured using <code>'catalog-type'='rest'</code>, which load [...]
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;catalog-type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;rest&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;uri&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;https://localhost/&#39;</span>
+</span></span><span style=display:flex><span>);
+</span></span></code></pre></div><p>The following properties can be set if using the REST catalog:</p><ul><li><code>uri</code>: The URL to the REST Catalog (Required)</li><li><code>credential</code>: A credential to exchange for a token in the OAuth2 client credentials flow (Optional)</li><li><code>token</code>: A token which will be used to interact with the server (Optional)</li></ul><h4 id=custom-catalog>Custom catalog</h4><p>Flink also supports loading a custom Iceberg <code>Catalog< [...]
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;catalog-impl&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;com.my.custom.CatalogImpl&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;my-additional-catalog-config&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;my-value&#39;</span>
+</span></span><span style=display:flex><span>);
+</span></span></code></pre></div><h4 id=create-through-yaml-config>Create through YAML config</h4><p>Catalogs can be registered in <code>sql-client-defaults.yaml</code> before starting the SQL client.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-yaml data-lang=yaml><span style=display:flex><span><span style=color:#f92672>catalogs</span>: 
+</span></span><span style=display:flex><span>  - <span style=color:#f92672>name</span>: <span style=color:#ae81ff>my_catalog</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>type</span>: <span style=color:#ae81ff>iceberg</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>catalog-type</span>: <span style=color:#ae81ff>hadoop</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>warehouse</span>: <span style=color:#ae81ff>hdfs://nn:8020/warehouse/path</span>
+</span></span></code></pre></div><h4 id=create-through-sql-files>Create through SQL Files</h4><p>The Flink SQL Client supports the <code>-i</code> startup option to execute an initialization SQL file to set up environment when starting up the SQL Client.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#75715e>-- define avail [...]
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>CATALOG</span> hive_catalog <span style=color:#66d9ef>WITH</span> (
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;catalog-type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hive&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;uri&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;thrift://localhost:9083&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;warehouse&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hdfs://nn:8020/warehouse/path&#39;</span>
+</span></span><span style=display:flex><span>);
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>USE <span style=color:#66d9ef>CATALOG</span> hive_catalog;
+</span></span></code></pre></div><p>Using <code>-i &lt;init.sql></code> option to initialize SQL Client session:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-bash data-lang=bash><span style=display:flex><span>/path/to/bin/sql-client.sh -i /path/to/init.sql
+</span></span></code></pre></div><h3 id=create-database><code>CREATE DATABASE</code></h3><p>By default, Iceberg will use the <code>default</code> database in Flink. Using the following example to create a separate database in order to avoid creating tables under the <code>default</code> database:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><spa [...]
+</span></span><span style=display:flex><span>USE iceberg_db;
+</span></span></code></pre></div><h3 id=create-table><code>CREATE TABLE</code></h3><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>TABLE</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d [...]
+</span></span><span style=display:flex><span>    id BIGINT <span style=color:#66d9ef>COMMENT</span> <span style=color:#e6db74>&#39;unique id&#39;</span>,
+</span></span><span style=display:flex><span>    <span style=color:#66d9ef>data</span> STRING
+</span></span><span style=display:flex><span>);
+</span></span></code></pre></div><p>Table create commands support the commonly used <a href=https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/>Flink create clauses</a> including:</p><ul><li><code>PARTITION BY (column1, column2, ...)</code> to configure partitioning, Flink does not yet support hidden partitioning.</li><li><code>COMMENT 'table document'</code> to set a table description.</li><li><code>WITH ('key'='value', ...)</code> to set <a href=../configura [...]
+</span></span><span style=display:flex><span>                                                   id BIGINT <span style=color:#66d9ef>COMMENT</span> <span style=color:#e6db74>&#39;unique id&#39;</span>,
+</span></span><span style=display:flex><span>                                                   <span style=color:#66d9ef>data</span> STRING
+</span></span><span style=display:flex><span>) PARTITIONED <span style=color:#66d9ef>BY</span> (<span style=color:#66d9ef>data</span>);
+</span></span></code></pre></div><p>Iceberg support hidden partition but Flink don&rsquo;t support partitioning by a function on columns, so there is no way to support hidden partition in Flink DDL.</p><h3 id=create-table-like><code>CREATE TABLE LIKE</code></h3><p>To create a table with the same schema, partitioning, and table properties as another table, use <code>CREATE TABLE LIKE</code>.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size [...]
+</span></span><span style=display:flex><span>                                                   id BIGINT <span style=color:#66d9ef>COMMENT</span> <span style=color:#e6db74>&#39;unique id&#39;</span>,
+</span></span><span style=display:flex><span>                                                   <span style=color:#66d9ef>data</span> STRING
+</span></span><span style=display:flex><span>);
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>TABLE</span>  <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample_like<span style=color:#f92672>`</span> <span style=color:#66d9ef>LIKE</span> <span style=color:#f92672>`</span>hive_catalog<span style=c [...]
+</span></span></code></pre></div><p>For more details, refer to the <a href=https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/>Flink <code>CREATE TABLE</code> documentation</a>.</p><h3 id=alter-table><code>ALTER TABLE</code></h3><p>Iceberg only support altering table properties:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=d [...]
+</span></span></code></pre></div><h3 id=alter-table--rename-to><code>ALTER TABLE .. RENAME TO</code></h3><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>ALTER</span> <span style=color:#66d9ef>TABLE</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>< [...]
+</span></span></code></pre></div><h3 id=drop-table><code>DROP TABLE</code></h3><p>To delete a table, run:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>DROP</span> <span style=color:#66d9ef>TABLE</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</spa [...]
+</span></span></code></pre></div></div><div id=toc class=markdown-body><div id=full><nav id=TableOfContents><ul><li><a href=#ddl-commands>DDL commands</a><ul><li><a href=#create-catalog><code>CREATE Catalog</code></a></li><li><a href=#create-database><code>CREATE DATABASE</code></a></li><li><a href=#create-table><code>CREATE TABLE</code></a></li><li><a href=#partitioned-by><code>PARTITIONED BY</code></a></li><li><a href=#create-table-like><code>CREATE TABLE LIKE</code></a></li><li><a hre [...]
+<script src=https://iceberg.apache.org/docs/1.2.1//js/jquery.easing.min.js></script>
+<script type=text/javascript src=https://iceberg.apache.org/docs/1.2.1//js/search.js></script>
+<script src=https://iceberg.apache.org/docs/1.2.1//js/bootstrap.min.js></script>
+<script src=https://iceberg.apache.org/docs/1.2.1//js/iceberg-theme.js></script></html>
\ No newline at end of file
diff --git a/docs/1.2.1/flink-queries/index.html b/docs/1.2.1/flink-queries/index.html
new file mode 100644
index 00000000..e61faf68
--- /dev/null
+++ b/docs/1.2.1/flink-queries/index.html
@@ -0,0 +1,176 @@
+<!doctype html><html><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><meta name=description content><meta name=author content><title>Flink Queries</title><link href=../css/bootstrap.css rel=stylesheet><link href=../css/markdown.css rel=stylesheet><link href=../css/katex.min.css rel=stylesheet><link href=../css/iceberg-theme.css rel=stylesheet><link href=../font-awesome-4.7.0/css/font-awesome.min [...]
+<span class=sr-only>Toggle navigation</span>
+<span class=icon-bar></span>
+<span class=icon-bar></span>
+<span class=icon-bar></span></button>
+<a class="page-scroll navbar-brand" href=https://iceberg.apache.org/><img class=top-navbar-logo src=https://iceberg.apache.org/docs/1.2.1//img/iceberg-logo-icon.png> Apache Iceberg</a></div><div><input type=search class=form-control id=search-input placeholder=Search... maxlength=64 data-hotkeys=s/></div><div class=versions-dropdown><span>1.2.0</span> <i class="fa fa-chevron-down"></i><div class=versions-dropdown-content><ul><li class=versions-dropdown-selection><a href=https://iceberg.a [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Tables class=collapse><ul class=sub-menu><li><a href=../configuration/>Configuration</a></li><li><a href=../evolution/>Evolution</a></li><li><a href=../maintenance/>Maintenance</a></li><li><a href=../partitioning/>Partitioning</a></li><li><a href=../performance/>Performance</a></li><li><a href=../reliability/>Reliability</a></li><li><a href=../schemas/>Schemas</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collaps [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class=chevron-toggle data-toggle=collapse data-parent=full href=#Flink><spa [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class="collapse in"><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a id=active href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li>< [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=API class=collapse><ul class=sub-menu><li><a href=../java-api-quickstart/>Java Quickstart</a></li><li><a href=../api/>Java API</a></li><li><a href=../custom-catalog/>Java Custom Catalog</a></li></ul></div><li><a href=https://iceberg.apache.org/docs/1.2.1/../../javadoc/latest><span>Javadoc</span></a></li><li><a target=_blank href=https://py.iceberg.apache.org/><span>PyIceberg</span></a></li></div></div><div id=content class=markdown-body> [...]
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> execution.runtime<span style=color:#f92672>-</span><span style=color:#66d9ef>mode</span> <span style=color:#f92672>=</span> streaming;
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>-- Execute the flink job in batch mode for current session context
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> execution.runtime<span style=color:#f92672>-</span><span style=color:#66d9ef>mode</span> <span style=color:#f92672>=</span> batch;
+</span></span></code></pre></div><h3 id=flink-batch-read>Flink batch read</h3><p>Submit a Flink <strong>batch</strong> job using the following sentences:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#75715e>-- Execute the flink job in batch mode for current session context
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> execution.runtime<span style=color:#f92672>-</span><span style=color:#66d9ef>mode</span> <span style=color:#f92672>=</span> batch;
+</span></span><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> sample;
+</span></span></code></pre></div><h3 id=flink-streaming-read>Flink streaming read</h3><p>Iceberg supports processing incremental data in Flink streaming jobs which starts from a historical snapshot-id:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#75715e>-- Submit the flink job in streaming mode for current session.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> execution.runtime<span style=color:#f92672>-</span><span style=color:#66d9ef>mode</span> <span style=color:#f92672>=</span> streaming;
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> <span style=color:#66d9ef>table</span>.<span style=color:#66d9ef>dynamic</span><span style=color:#f92672>-</span><span style=color:#66d9ef>table</span><span style=color:#f92672>-</span><span style=color:#66d9ef>options</span>.enabled<span style=color:#f92672>=</span><span style=color:#66d9ef>true</span>;
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> sample <span style=color:#75715e>/*+ OPTIONS(&#39;streaming&#39;=&#39;true&#39;, &#39;monitor-interval&#39;=&#39;1s&#39;)*/</span> ;
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>-- Read all incremental data starting from the snapshot-id &#39;3821550127947089987&#39; (records from this snapshot will be excluded).
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> sample <span style=color:#75715e>/*+ OPTIONS(&#39;streaming&#39;=&#39;true&#39;, &#39;monitor-interval&#39;=&#39;1s&#39;, &#39;start-snapshot-id&#39;=&#39;3821550127947089987&#39;)*/</span> ;
+</span></span></code></pre></div><p>There are some options that could be set in Flink SQL hint options for streaming job, see <a href=#Read-options>read options</a> for details.</p><h3 id=flip-27-source-for-sql>FLIP-27 source for SQL</h3><p>Here are the SQL settings for the <a href=https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface>FLIP-27</a> source. All other SQL settings and options documented above are applicable to the FLIP-27 source.</p><div clas [...]
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> <span style=color:#66d9ef>table</span>.<span style=color:#66d9ef>exec</span>.iceberg.use<span style=color:#f92672>-</span>flip27<span style=color:#f92672>-</span><span style=color:#66d9ef>source</span> <span style=color:#f92672>=</span> <span style=color:#66d9ef>true</span>;
+</span></span></code></pre></div><h2 id=reading-with-datastream>Reading with DataStream</h2><p>Iceberg support streaming or batch read in Java API now.</p><h3 id=batch-read>Batch Read</h3><p>This example will read all records from iceberg table and then print to the stdout console in flink batch job:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:fle [...]
+</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>);</span>
+</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> batch <span style=color:#f92672>=</span> FlinkSource<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>()</span>
+</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>env</span><span style=color:#f92672>(</span>env<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>streaming</span><span style=color:#f92672>(</span><span style=color:#66d9ef>false</span><span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>build</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>// Print all records to stdout.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span>batch<span style=color:#f92672>.</span><span style=color:#a6e22e>print</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>// Submit and execute this batch read job.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg Batch Read&#34;</span><span style=color:#f92672>);</span>
+</span></span></code></pre></div><h3 id=streaming-read>Streaming read</h3><p>This example will read incremental records which start from snapshot-id &lsquo;3821550127947089987&rsquo; and print to stdout console in flink streaming job:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>StreamExecutionEnvironment env <span style=color:#f92672>=< [...]
+</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>);</span>
+</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> stream <span style=color:#f92672>=</span> FlinkSource<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>()</span>
+</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>env</span><span style=color:#f92672>(</span>env<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>streaming</span><span style=color:#f92672>(</span><span style=color:#66d9ef>true</span><span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>startSnapshotId</span><span style=color:#f92672>(</span><span style=color:#ae81ff>3821550127947089987L</span><span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>build</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>// Print all records to stdout.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span>stream<span style=color:#f92672>.</span><span style=color:#a6e22e>print</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>// Submit and execute this streaming read job.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg Streaming Read&#34;</span><span style=color:#f92672>);</span>
+</span></span></code></pre></div><p>There are other options that can be set, please see the <a href=../../../javadoc/1.2.0/org/apache/iceberg/flink/source/FlinkSource.html>FlinkSource#Builder</a>.</p><h2 id=reading-with-datastream-flip-27-source>Reading with DataStream (FLIP-27 source)</h2><p><a href=https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface>FLIP-27 source interface</a>
+was introduced in Flink 1.12. It aims to solve several shortcomings of the old <code>SourceFunction</code>
+streaming source interface. It also unifies the source interfaces for both batch and streaming executions.
+Most source connectors (like Kafka, file) in Flink repo have migrated to the FLIP-27 interface.
+Flink is planning to deprecate the old <code>SourceFunction</code> interface in the near future.</p><p>A FLIP-27 based Flink <code>IcebergSource</code> is added in <code>iceberg-flink</code> module. The FLIP-27 <code>IcebergSource</code> is currently an experimental feature.</p><h3 id=batch-read-1>Batch Read</h3><p>This example will read all records from iceberg table and then print to the stdout console in flink batch job:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;back [...]
+</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>);</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>IcebergSource<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> source <span style=color:#f92672>=</span> IcebergSource<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>()</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>assignerFactory</span><span style=color:#f92672>(</span><span style=color:#66d9ef>new</span> SimpleSplitAssignerFactory<span style=color:#f92672>())</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>build</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> batch <span style=color:#f92672>=</span> env<span style=color:#f92672>.</span><span style=color:#a6e22e>fromSource</span><span style=color:#f92672>(</span>
+</span></span><span style=display:flex><span>    source<span style=color:#f92672>,</span>
+</span></span><span style=display:flex><span>    WatermarkStrategy<span style=color:#f92672>.</span><span style=color:#a6e22e>noWatermarks</span><span style=color:#f92672>(),</span>
+</span></span><span style=display:flex><span>    <span style=color:#e6db74>&#34;My Iceberg Source&#34;</span><span style=color:#f92672>,</span>
+</span></span><span style=display:flex><span>    TypeInformation<span style=color:#f92672>.</span><span style=color:#a6e22e>of</span><span style=color:#f92672>(</span>RowData<span style=color:#f92672>.</span><span style=color:#a6e22e>class</span><span style=color:#f92672>));</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>// Print all records to stdout.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span>batch<span style=color:#f92672>.</span><span style=color:#a6e22e>print</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>// Submit and execute this batch read job.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg Batch Read&#34;</span><span style=color:#f92672>);</span>
+</span></span></code></pre></div><h3 id=streaming-read-1>Streaming read</h3><p>This example will start the streaming read from the latest table snapshot (inclusive).
+Every 60s, it polls Iceberg table to discover new append-only snapshots.
+CDC read is not supported yet.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>StreamExecutionEnvironment env <span style=color:#f92672>=</span> StreamExecutionEnvironment<span style=color:#f92672>.</span><span style=color:#a6e22e>createLocalEnvironment</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>);</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>IcebergSource source <span style=color:#f92672>=</span> IcebergSource<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>()</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>assignerFactory</span><span style=color:#f92672>(</span><span style=color:#66d9ef>new</span> SimpleSplitAssignerFactory<span style=color:#f92672>())</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>streaming</span><span style=color:#f92672>(</span><span style=color:#66d9ef>true</span><span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>streamingStartingStrategy</span><span style=color:#f92672>(</span>StreamingStartingStrategy<span style=color:#f92672>.</span><span style=color:#a6e22e>INCREMENTAL_FROM_LATEST_SNAPSHOT</span><span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>monitorInterval</span><span style=color:#f92672>(</span>Duration<span style=color:#f92672>.</span><span style=color:#a6e22e>ofSeconds</span><span style=color:#f92672>(</span><span style=color:#ae81ff>60</span><span style=color:#f92672>))</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>build</span><span style=color:#f92672>()</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> stream <span style=color:#f92672>=</span> env<span style=color:#f92672>.</span><span style=color:#a6e22e>fromSource</span><span style=color:#f92672>(</span>
+</span></span><span style=display:flex><span>    source<span style=color:#f92672>,</span>
+</span></span><span style=display:flex><span>    WatermarkStrategy<span style=color:#f92672>.</span><span style=color:#a6e22e>noWatermarks</span><span style=color:#f92672>(),</span>
+</span></span><span style=display:flex><span>    <span style=color:#e6db74>&#34;My Iceberg Source&#34;</span><span style=color:#f92672>,</span>
+</span></span><span style=display:flex><span>    TypeInformation<span style=color:#f92672>.</span><span style=color:#a6e22e>of</span><span style=color:#f92672>(</span>RowData<span style=color:#f92672>.</span><span style=color:#a6e22e>class</span><span style=color:#f92672>));</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>// Print all records to stdout.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span>stream<span style=color:#f92672>.</span><span style=color:#a6e22e>print</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>// Submit and execute this streaming read job.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg Streaming Read&#34;</span><span style=color:#f92672>);</span>
+</span></span></code></pre></div><p>There are other options that could be set by Java API, please see the
+<a href=../../../javadoc/1.2.0/org/apache/iceberg/flink/source/IcebergSource.html>IcebergSource#Builder</a>.</p><h3 id=read-as-avro-genericrecord>Read as Avro GenericRecord</h3><p>FLIP-27 Iceberg source provides <code>AvroGenericRecordReaderFunction</code> that converts
+Flink <code>RowData</code> Avro <code>GenericRecord</code>. You can use the convert to read from
+Iceberg table as Avro GenericRecord DataStream.</p><p>Please make sure <code>flink-avro</code> jar is included in the classpath.
+Also <code>iceberg-flink-runtime</code> shaded bundle jar can&rsquo;t be used
+because the runtime jar shades the avro package.
+Please use non-shaded <code>iceberg-flink</code> jar instead.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> <span style=color:#f92672>...;</span>
+</span></span><span style=display:flex><span>Table table<span style=color:#f92672>;</span>
+</span></span><span style=display:flex><span><span style=color:#66d9ef>try</span> <span style=color:#f92672>(</span>TableLoader loader <span style=color:#f92672>=</span> tableLoader<span style=color:#f92672>)</span> <span style=color:#f92672>{</span>
+</span></span><span style=display:flex><span>    loader<span style=color:#f92672>.</span><span style=color:#a6e22e>open</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>    table <span style=color:#f92672>=</span> loader<span style=color:#f92672>.</span><span style=color:#a6e22e>loadTable</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span><span style=color:#f92672>}</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>AvroGenericRecordReaderFunction readerFunction <span style=color:#f92672>=</span> AvroGenericRecordReaderFunction<span style=color:#f92672>.</span><span style=color:#a6e22e>fromTable</span><span style=color:#f92672>(</span>table<span style=color:#f92672>);</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>IcebergSource<span style=color:#f92672>&lt;</span>GenericRecord<span style=color:#f92672>&gt;</span> source <span style=color:#f92672>=</span>
+</span></span><span style=display:flex><span>    IcebergSource<span style=color:#f92672>.&lt;</span>GenericRecord<span style=color:#f92672>&gt;</span>builder<span style=color:#f92672>()</span>
+</span></span><span style=display:flex><span>        <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>        <span style=color:#f92672>.</span><span style=color:#a6e22e>readerFunction</span><span style=color:#f92672>(</span>readerFunction<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>        <span style=color:#f92672>.</span><span style=color:#a6e22e>assignerFactory</span><span style=color:#f92672>(</span><span style=color:#66d9ef>new</span> SimpleSplitAssignerFactory<span style=color:#f92672>())</span>
+</span></span><span style=display:flex><span>        <span style=color:#f92672>...</span>
+</span></span><span style=display:flex><span>        <span style=color:#f92672>.</span><span style=color:#a6e22e>build</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>Row<span style=color:#f92672>&gt;</span> stream <span style=color:#f92672>=</span> env<span style=color:#f92672>.</span><span style=color:#a6e22e>fromSource</span><span style=color:#f92672>(</span>source<span style=color:#f92672>,</span> WatermarkStrategy<span style=color:#f92672>.</span><span style=color:#a6e22e>noWatermarks</span><span style=color:#f92672>(),</span>
+</span></span><span style=display:flex><span>    <span style=color:#e6db74>&#34;Iceberg Source as Avro GenericRecord&#34;</span><span style=color:#f92672>,</span> <span style=color:#66d9ef>new</span> GenericRecordAvroTypeInfo<span style=color:#f92672>(</span>avroSchema<span style=color:#f92672>));</span>
+</span></span></code></pre></div><h2 id=options>Options</h2><h3 id=read-options>Read options</h3><p>Flink read options are passed when configuring the Flink IcebergSource:</p><pre tabindex=0><code>IcebergSource.forRowData()
+    .tableLoader(TableLoader.fromCatalog(...))
+    .assignerFactory(new SimpleSplitAssignerFactory())
+    .streaming(true)
+    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
+    .startSnapshotId(3821550127947089987L)
+    .monitorInterval(Duration.ofMillis(10L)) // or .set(&#34;monitor-interval&#34;, &#34;10s&#34;) \ set(FlinkReadOptions.MONITOR_INTERVAL, &#34;10s&#34;)
+    .build()
+</code></pre><p>For Flink SQL, read options can be passed in via SQL hints like this:</p><pre tabindex=0><code>SELECT * FROM tableName /*+ OPTIONS(&#39;monitor-interval&#39;=&#39;10s&#39;) */
+...
+</code></pre><p>Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.</p><pre tabindex=0><code>env.getConfig()
+    .getConfiguration()
+    .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
+...
+</code></pre><p>Check out all the options here: <a href=../flink-configuration#read-options>read-options</a></p><h2 id=inspecting-tables>Inspecting tables</h2><p>To inspect a table&rsquo;s history, snapshots, and other metadata, Iceberg supports metadata tables.</p><p>Metadata tables are identified by adding the metadata table name after the original table name. For example, history for <code>db.table</code> is read using <code>db.table$history</code>.</p><h3 id=history>History</h3><p>To [...]
+</span></span></code></pre></div><table><thead><tr><th>made_current_at</th><th>snapshot_id</th><th>parent_id</th><th>is_current_ancestor</th></tr></thead><tbody><tr><td>2019-02-08 03:29:51.215</td><td>5781947118336215154</td><td>NULL</td><td>true</td></tr><tr><td>2019-02-08 03:47:55.948</td><td>5179299526185056830</td><td>5781947118336215154</td><td>true</td></tr><tr><td>2019-02-09 16:24:30.13</td><td>296410040247533544</td><td>5179299526185056830</td><td>false</td></tr><tr><td>2019-02-0 [...]
+</span></span></code></pre></div><table><thead><tr><th>timestamp</th><th>file</th><th>latest_snapshot_id</th><th>latest_schema_id</th><th>latest_sequence_number</th></tr></thead><tbody><tr><td>2022-07-28 10:43:52.93</td><td>s3://&mldr;/table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.json</td><td>null</td><td>null</td><td>null</td></tr><tr><td>2022-07-28 10:43:57.487</td><td>s3://&mldr;/table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json</td><td>1702 [...]
+</span></span></code></pre></div><table><thead><tr><th>committed_at</th><th>snapshot_id</th><th>parent_id</th><th>operation</th><th>manifest_list</th><th>summary</th></tr></thead><tbody><tr><td>2019-02-08 03:29:51.215</td><td>57897183625154</td><td>null</td><td>append</td><td>s3://&mldr;/table/metadata/snap-57897183625154-1.avro</td><td>{ added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, flink.job-id -> 2e274eecb503d85369fb390e8956c813  [...]
+</span></span><span style=display:flex><span>    h.made_current_at,
+</span></span><span style=display:flex><span>    s.<span style=color:#66d9ef>operation</span>,
+</span></span><span style=display:flex><span>    h.snapshot_id,
+</span></span><span style=display:flex><span>    h.is_current_ancestor,
+</span></span><span style=display:flex><span>    s.summary[<span style=color:#e6db74>&#39;flink.job-id&#39;</span>]
+</span></span><span style=display:flex><span><span style=color:#66d9ef>from</span> prod.db.<span style=color:#66d9ef>table</span><span style=color:#960050;background-color:#1e0010>$</span>history h
+</span></span><span style=display:flex><span><span style=color:#66d9ef>join</span> prod.db.<span style=color:#66d9ef>table</span><span style=color:#960050;background-color:#1e0010>$</span>snapshots s
+</span></span><span style=display:flex><span>  <span style=color:#66d9ef>on</span> h.snapshot_id <span style=color:#f92672>=</span> s.snapshot_id
+</span></span><span style=display:flex><span><span style=color:#66d9ef>order</span> <span style=color:#66d9ef>by</span> made_current_at
+</span></span></code></pre></div><table><thead><tr><th>made_current_at</th><th>operation</th><th>snapshot_id</th><th>is_current_ancestor</th><th>summary[flink.job-id]</th></tr></thead><tbody><tr><td>2019-02-08 03:29:51.215</td><td>append</td><td>57897183625154</td><td>true</td><td>2e274eecb503d85369fb390e8956c813</td></tr></tbody></table><h3 id=files>Files</h3><p>To show a table&rsquo;s current data files:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#2728 [...]
+</span></span></code></pre></div><table><thead><tr><th>content</th><th>file_path</th><th>file_format</th><th>spec_id</th><th>partition</th><th>record_count</th><th>file_size_in_bytes</th><th>column_sizes</th><th>value_counts</th><th>null_value_counts</th><th>nan_value_counts</th><th>lower_bounds</th><th>upper_bounds</th><th>key_metadata</th><th>split_offsets</th><th>equality_ids</th><th>sort_order_id</th></tr></thead><tbody><tr><td>0</td><td>s3:/&mldr;/table/data/00000-3-8d6d60e8-d427-48 [...]
+</span></span></code></pre></div><table><thead><tr><th>path</th><th>length</th><th>partition_spec_id</th><th>added_snapshot_id</th><th>added_data_files_count</th><th>existing_data_files_count</th><th>deleted_data_files_count</th><th>partition_summaries</th></tr></thead><tbody><tr><td>s3://&mldr;/table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro</td><td>4479</td><td>0</td><td>6668963634911763636</td><td>8</td><td>0</td><td>0</td><td>[[false,null,2019-05-13,2019-05-15]]</td></tr> [...]
+This usually occurs when reading from V1 table, where <code>contains_nan</code> is not populated.</li></ol><h3 id=partitions>Partitions</h3><p>To show a table&rsquo;s current partitions:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</sp [...]
+</span></span></code></pre></div><table><thead><tr><th>partition</th><th>record_count</th><th>file_count</th><th>spec_id</th></tr></thead><tbody><tr><td>{20211001, 11}</td><td>1</td><td>1</td><td>0</td></tr><tr><td>{20211002, 11}</td><td>1</td><td>1</td><td>0</td></tr><tr><td>{20211001, 10}</td><td>1</td><td>1</td><td>0</td></tr><tr><td>{20211002, 10}</td><td>1</td><td>1</td><td>0</td></tr></tbody></table><p>Note:
+For unpartitioned tables, the partitions table will contain only the record_count and file_count columns.</p><h3 id=all-metadata-tables>All Metadata Tables</h3><p>These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots.</p><div class=danger>The &ldquo;all&rdquo; metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot.</div><h4 id=all-da [...]
+</span></span></code></pre></div><table><thead><tr><th>content</th><th>file_path</th><th>file_format</th><th>partition</th><th>record_count</th><th>file_size_in_bytes</th><th>column_sizes</th><th>value_counts</th><th>null_value_counts</th><th>nan_value_counts</th><th>lower_bounds</th><th>upper_bounds</th><th>key_metadata</th><th>split_offsets</th><th>equality_ids</th><th>sort_order_id</th></tr></thead><tbody><tr><td>0</td><td>s3://&mldr;/dt=20210102/00000-0-756e2512-49ae-45bb-aae3-c0ca47 [...]
+</span></span></code></pre></div><table><thead><tr><th>path</th><th>length</th><th>partition_spec_id</th><th>added_snapshot_id</th><th>added_data_files_count</th><th>existing_data_files_count</th><th>deleted_data_files_count</th><th>partition_summaries</th></tr></thead><tbody><tr><td>s3://&mldr;/metadata/a85f78c5-3222-4b37-b7e4-faf944425d48-m0.avro</td><td>6376</td><td>0</td><td>6272782676904868561</td><td>2</td><td>0</td><td>0</td><td>[{false, false, 20210101, 20210101}]</td></tr></tbod [...]
+This usually occurs when reading from V1 table, where <code>contains_nan</code> is not populated.</li></ol><h3 id=references>References</h3><p>To show a table&rsquo;s known snapshot references:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>F [...]
+</span></span></code></pre></div><table><thead><tr><th>name</th><th>type</th><th>snapshot_id</th><th>max_reference_age_in_ms</th><th>min_snapshots_to_keep</th><th>max_snapshot_age_in_ms</th></tr></thead><tbody><tr><td>main</td><td>BRANCH</td><td>4686954189838128572</td><td>10</td><td>20</td><td>30</td></tr><tr><td>testTag</td><td>TAG</td><td>4686954189838128572</td><td>10</td><td>null</td><td>null</td></tr></tbody></table></div><div id=toc class=markdown-body><div id=full><nav id=TableOf [...]
+<script src=https://iceberg.apache.org/docs/1.2.1//js/jquery.easing.min.js></script>
+<script type=text/javascript src=https://iceberg.apache.org/docs/1.2.1//js/search.js></script>
+<script src=https://iceberg.apache.org/docs/1.2.1//js/bootstrap.min.js></script>
+<script src=https://iceberg.apache.org/docs/1.2.1//js/iceberg-theme.js></script></html>
\ No newline at end of file
diff --git a/docs/1.2.1/flink-writes/index.html b/docs/1.2.1/flink-writes/index.html
new file mode 100644
index 00000000..4a77e0ea
--- /dev/null
+++ b/docs/1.2.1/flink-writes/index.html
@@ -0,0 +1,106 @@
+<!doctype html><html><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><meta name=description content><meta name=author content><title>Flink Writes</title><link href=../css/bootstrap.css rel=stylesheet><link href=../css/markdown.css rel=stylesheet><link href=../css/katex.min.css rel=stylesheet><link href=../css/iceberg-theme.css rel=stylesheet><link href=../font-awesome-4.7.0/css/font-awesome.min. [...]
+<span class=sr-only>Toggle navigation</span>
+<span class=icon-bar></span>
+<span class=icon-bar></span>
+<span class=icon-bar></span></button>
+<a class="page-scroll navbar-brand" href=https://iceberg.apache.org/><img class=top-navbar-logo src=https://iceberg.apache.org/docs/1.2.1//img/iceberg-logo-icon.png> Apache Iceberg</a></div><div><input type=search class=form-control id=search-input placeholder=Search... maxlength=64 data-hotkeys=s/></div><div class=versions-dropdown><span>1.2.0</span> <i class="fa fa-chevron-down"></i><div class=versions-dropdown-content><ul><li class=versions-dropdown-selection><a href=https://iceberg.a [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Tables class=collapse><ul class=sub-menu><li><a href=../configuration/>Configuration</a></li><li><a href=../evolution/>Evolution</a></li><li><a href=../maintenance/>Maintenance</a></li><li><a href=../partitioning/>Partitioning</a></li><li><a href=../performance/>Performance</a></li><li><a href=../reliability/>Reliability</a></li><li><a href=../schemas/>Schemas</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collaps [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class=chevron-toggle data-toggle=collapse data-parent=full href=#Flink><spa [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class="collapse in"><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a id=active href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li>< [...]
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
+<i class="fa fa-chevron-right"></i>
+<i class="fa fa-chevron-down"></i></a></li><div id=API class=collapse><ul class=sub-menu><li><a href=../java-api-quickstart/>Java Quickstart</a></li><li><a href=../api/>Java API</a></li><li><a href=../custom-catalog/>Java Custom Catalog</a></li></ul></div><li><a href=https://iceberg.apache.org/docs/1.2.1/../../javadoc/latest><span>Javadoc</span></a></li><li><a target=_blank href=https://py.iceberg.apache.org/><span>PyIceberg</span></a></li></div></div><div id=content class=markdown-body> [...]
+</span></span><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> <span style=color:#66d9ef>INTO</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> <span style=color:#66d9ef>SELECT</span> id, <span style=color:#66d9ef>data</span> <span style=color:#66d [...]
+</span></span></code></pre></div><h3 id=insert-overwrite><code>INSERT OVERWRITE</code></h3><p>To replace data in the table with the result of a query, use <code>INSERT OVERWRITE</code> in batch job (flink streaming job does not support <code>INSERT OVERWRITE</code>). Overwrites are atomic operations for Iceberg tables.</p><p>Partitions that have rows produced by the SELECT query will be replaced, for example:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#2 [...]
+</span></span></code></pre></div><p>Iceberg also support overwriting given partitions by the <code>select</code> values:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> OVERWRITE <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span styl [...]
+</span></span></code></pre></div><p>For a partitioned iceberg table, when all the partition columns are set a value in <code>PARTITION</code> clause, it is inserting into a static partition, otherwise if partial partition columns (prefix part of all partition columns) are set a value in <code>PARTITION</code> clause, it is writing the query result into a dynamic partition.
+For an unpartitioned iceberg table, its data will be completely overwritten by <code>INSERT OVERWRITE</code>.</p><h3 id=upsert><code>UPSERT</code></h3><p>Iceberg supports <code>UPSERT</code> based on the primary key when writing data into v2 table format. There are two ways to enable upsert.</p><ol><li>Enable the <code>UPSERT</code> mode as table-level property <code>write.upsert.enabled</code>. Here is an example SQL statement to set the table property when creating a table. It would be [...]
+</span></span><span style=display:flex><span>  <span style=color:#f92672>`</span>id<span style=color:#f92672>`</span>  INT <span style=color:#66d9ef>UNIQUE</span> <span style=color:#66d9ef>COMMENT</span> <span style=color:#e6db74>&#39;unique id&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#f92672>`</span><span style=color:#66d9ef>data</span><span style=color:#f92672>`</span> STRING <span style=color:#66d9ef>NOT</span> <span style=color:#66d9ef>NULL</span>,
+</span></span><span style=display:flex><span> <span style=color:#66d9ef>PRIMARY</span> <span style=color:#66d9ef>KEY</span>(<span style=color:#f92672>`</span>id<span style=color:#f92672>`</span>) <span style=color:#66d9ef>NOT</span> ENFORCED
+</span></span><span style=display:flex><span>) <span style=color:#66d9ef>with</span> (<span style=color:#e6db74>&#39;format-version&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;2&#39;</span>, <span style=color:#e6db74>&#39;write.upsert.enabled&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;true&#39;</span>);
+</span></span></code></pre></div><ol start=2><li>Enabling <code>UPSERT</code> mode using <code>upsert-enabled</code> in the <a href=#Write-options>write options</a> provides more flexibility than a table level config. Note that you still need to use v2 table format and specify the primary key when creating the table.</li></ol><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><s [...]
+</span></span><span style=display:flex><span>...
+</span></span></code></pre></div><div class=info>OVERWRITE and UPSERT can&rsquo;t be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.</div><h2 id=writing-with-datastream>Writing with DataStream</h2><p>Iceberg support writing to iceberg table from different DataStream input.</p><h3 id=appending-data>Appending data.</h3><p>Flink supports writing <code>DataStream&lt;RowData></code> and <code>DataStream&lt;Row></code> to t [...]
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> input <span style=color:#f92672>=</span> <span style=color:#f92672>...</span> <span style=color:#f92672>;</span>
+</span></span><span style=display:flex><span>Configuration hadoopConf <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> Configuration<span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>,</span> hadoopConf<span style=color:#f92672>);</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>(</span>input<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg DataStream&#34;</span><span style=color:#f92672>);</span>
+</span></span></code></pre></div><p>The iceberg API also allows users to write generic <code>DataStream&lt;T></code> to iceberg table, more example could be found in this <a href=https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java>unit test</a>.</p><h3 id=overwrite-data>Overwrite data</h3><p>Set the <code>overwrite</code> flag in FlinkSink builder to overwrite the data in existing iceberg tables:</p><div cl [...]
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> input <span style=color:#f92672>=</span> <span style=color:#f92672>...</span> <span style=color:#f92672>;</span>
+</span></span><span style=display:flex><span>Configuration hadoopConf <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> Configuration<span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>,</span> hadoopConf<span style=color:#f92672>);</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>(</span>input<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>overwrite</span><span style=color:#f92672>(</span><span style=color:#66d9ef>true</span><span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg DataStream&#34;</span><span style=color:#f92672>);</span>
+</span></span></code></pre></div><h3 id=upsert-data>Upsert data</h3><p>Set the <code>upsert</code> flag in FlinkSink builder to upsert the data in existing iceberg table. The table must use v2 table format and have a primary key.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>StreamExecutionEnvironment env <span style=color:#f92672>=</span [...]
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> input <span style=color:#f92672>=</span> <span style=color:#f92672>...</span> <span style=color:#f92672>;</span>
+</span></span><span style=display:flex><span>Configuration hadoopConf <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> Configuration<span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>,</span> hadoopConf<span style=color:#f92672>);</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>(</span>input<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>upsert</span><span style=color:#f92672>(</span><span style=color:#66d9ef>true</span><span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg DataStream&#34;</span><span style=color:#f92672>);</span>
+</span></span></code></pre></div><div class=info>OVERWRITE and UPSERT can&rsquo;t be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.</div><h3 id=write-with-avro-genericrecord>Write with Avro GenericRecord</h3><p>Flink Iceberg sink provides <code>AvroGenericRecordToRowDataMapper</code> that converts
+Avro <code>GenericRecord</code> to Flink <code>RowData</code>. You can use the mapper to write
+Avro GenericRecord DataStream to Iceberg.</p><p>Please make sure <code>flink-avro</code> jar is included in the classpath.
+Also <code>iceberg-flink-runtime</code> shaded bundle jar can&rsquo;t be used
+because the runtime jar shades the avro package.
+Please use non-shaded <code>iceberg-flink</code> jar instead.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>org<span style=color:#f92672>.</span><span style=color:#a6e22e>apache</span><span style=color:#f92672>.</span><span style=color:#a6e22e>avro</span><span style=color:#f92672>.</span><spa [...]
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>Schema icebergSchema <span style=color:#f92672>=</span> table<span style=color:#f92672>.</span><span style=color:#a6e22e>schema</span><span style=color:#f92672>();</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span><span style=color:#75715e>// The Avro schema converted from Iceberg schema can&#39;t be used
+</span></span></span><span style=display:flex><span><span style=color:#75715e>// due to precision difference between how Iceberg schema (micro)
+</span></span></span><span style=display:flex><span><span style=color:#75715e>// and Flink AvroToRowDataConverters (milli) deal with time type.
+</span></span></span><span style=display:flex><span><span style=color:#75715e>// Instead, use the Avro schema defined directly.
+</span></span></span><span style=display:flex><span><span style=color:#75715e>// See AvroGenericRecordToRowDataMapper Javadoc for more details.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span>org<span style=color:#f92672>.</span><span style=color:#a6e22e>apache</span><span style=color:#f92672>.</span><span style=color:#a6e22e>avro</span><span style=color:#f92672>.</span><span style=color:#a6e22e>Schema</span> avroSchema <span style=color:#f92672>=</span> AvroSchemaUtil<span style=color:#f92672>.</span><span style=color:#a6e22e>convert</span><span style=color:#f92672>(</span>icebergSchema<span [...]
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>GenericRecordAvroTypeInfo avroTypeInfo <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> GenericRecordAvroTypeInfo<span style=color:#f92672>(</span>avroSchema<span style=color:#f92672>);</span>
+</span></span><span style=display:flex><span>RowType rowType <span style=color:#f92672>=</span> FlinkSchemaUtil<span style=color:#f92672>.</span><span style=color:#a6e22e>convert</span><span style=color:#f92672>(</span>icebergSchema<span style=color:#f92672>);</span>
+</span></span><span style=display:flex><span>
+</span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>builderFor</span><span style=color:#f92672>(</span>
+</span></span><span style=display:flex><span>    dataStream<span style=color:#f92672>,</span>
+</span></span><span style=display:flex><span>    AvroGenericRecordToRowDataMapper<span style=color:#f92672>.</span><span style=color:#a6e22e>forAvroSchema</span><span style=color:#f92672>(</span>avroSchema<span style=color:#f92672>),</span>
+</span></span><span style=display:flex><span>    FlinkCompatibilityUtil<span style=color:#f92672>.</span><span style=color:#a6e22e>toTypeInfo</span><span style=color:#f92672>(</span>rowType<span style=color:#f92672>))</span>
+</span></span><span style=display:flex><span>  <span style=color:#f92672>.</span><span style=color:#a6e22e>table</span><span style=color:#f92672>(</span>table<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>  <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>  <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span>
+</span></span></code></pre></div><h3 id=metrics>Metrics</h3><p>The following Flink metrics are provided by the Flink Iceberg sink.</p><p>Parallel writer metrics are added under the sub group of <code>IcebergStreamWriter</code>.
+They should have the following key-value tags.</p><ul><li>table: full table name (like iceberg.my_db.my_table)</li><li>subtask_index: writer subtask index starting from 0</li></ul><table><thead><tr><th>Metric name</th><th>Metric type</th><th>Description</th></tr></thead><tbody><tr><td>lastFlushDurationMs</td><td>Gague</td><td>The duration (in milli) that writer subtasks take to flush and upload the files during checkpoint.</td></tr><tr><td>flushedDataFiles</td><td>Counter</td><td>Number  [...]
+They should have the following key-value tags.</p><ul><li>table: full table name (like iceberg.my_db.my_table)</li></ul><table><thead><tr><th>Metric name</th><th>Metric type</th><th>Description</th></tr></thead><tbody><tr><td>lastCheckpointDurationMs</td><td>Gague</td><td>The duration (in milli) that the committer operator checkpoints its state.</td></tr><tr><td>lastCommitDurationMs</td><td>Gague</td><td>The duration (in milli) that the Iceberg table commit takes.</td></tr><tr><td>commit [...]
+to detect failed or missing Iceberg commits.</p><ul><li>Iceberg commit happened after successful Flink checkpoint in the <code>notifyCheckpointComplete</code> callback.
+It could happen that Iceberg commits failed (for whatever reason), while Flink checkpoints succeeding.</li><li>It could also happen that <code>notifyCheckpointComplete</code> wasn&rsquo;t triggered (for whatever bug).
+As a result, there won&rsquo;t be any Iceberg commits attempted.</li></ul><p>If the checkpoint interval (and expected Iceberg commit interval) is 5 minutes, set up alert with rule like <code>elapsedSecondsSinceLastSuccessfulCommit > 60 minutes</code> to detect failed or missing Iceberg commits in the past hour.</p><h2 id=options>Options</h2><h3 id=write-options>Write options</h3><p>Flink write options are passed when configuring the FlinkSink, like this:</p><div class=highlight><pre tabi [...]
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>table</span><span style=color:#f92672>(</span>table<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>set</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;write-format&#34;</span><span style=color:#f92672>,</span> <span style=color:#e6db74>&#34;orc&#34;</span><span style=color:#f92672>)</span>
+</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>set</span><span style=color:#f92672>(</span>FlinkWriteOptions<span style=color:#f92672>.</span><span style=color:#a6e22e>OVERWRITE_MODE</span><span style=color:#f92672>,</span> <span style=color:#e6db74>&#34;true&#34;</span><span style=color:#f92672>);</span>
+</span></span></code></pre></div><p>For Flink SQL, write options can be passed in via SQL hints like this:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> <span style=color:#66d9ef>INTO</span> tableName <span style=color:#75715e>/*+ OPTIONS(&#39;upsert-enabled&#39;=&#39;true&#39;) */</span>
+</span></span><span style=display:flex><span>...
+</span></span></code></pre></div><p>Check out all the options here: <a href=../flink-configuration#write-options>write-options</a></p></div><div id=toc class=markdown-body><div id=full><nav id=TableOfContents><ul><li><a href=#writing-with-sql>Writing with SQL</a><ul><li><a href=#insert-into><code>INSERT INTO</code></a></li><li><a href=#insert-overwrite><code>INSERT OVERWRITE</code></a></li><li><a href=#upsert><code>UPSERT</code></a></li></ul></li><li><a href=#writing-with-datastream>Writ [...]
+<script src=https://iceberg.apache.org/docs/1.2.1//js/jquery.easing.min.js></script>
+<script type=text/javascript src=https://iceberg.apache.org/docs/1.2.1//js/search.js></script>
+<script src=https://iceberg.apache.org/docs/1.2.1//js/bootstrap.min.js></script>
+<script src=https://iceberg.apache.org/docs/1.2.1//js/iceberg-theme.js></script></html>
\ No newline at end of file
diff --git a/docs/1.2.1/flink/flink-actions/index.html b/docs/1.2.1/flink/flink-actions/index.html
new file mode 100644
index 00000000..69073690
--- /dev/null
+++ b/docs/1.2.1/flink/flink-actions/index.html
@@ -0,0 +1 @@
+<!doctype html><html lang=en-us><head><title>https://iceberg.apache.org/docs/1.2.1/flink-actions/</title><link rel=canonical href=https://iceberg.apache.org/docs/1.2.1/flink-actions/><meta name=robots content="noindex"><meta charset=utf-8><meta http-equiv=refresh content="0; url=https://iceberg.apache.org/docs/1.2.1/flink-actions/"></head></html>
\ No newline at end of file
diff --git a/docs/1.2.1/flink/flink-configuration/index.html b/docs/1.2.1/flink/flink-configuration/index.html
new file mode 100644
index 00000000..e91d0543
--- /dev/null
+++ b/docs/1.2.1/flink/flink-configuration/index.html
@@ -0,0 +1 @@
+<!doctype html><html lang=en-us><head><title>https://iceberg.apache.org/docs/1.2.1/flink-configuration/</title><link rel=canonical href=https://iceberg.apache.org/docs/1.2.1/flink-configuration/><meta name=robots content="noindex"><meta charset=utf-8><meta http-equiv=refresh content="0; url=https://iceberg.apache.org/docs/1.2.1/flink-configuration/"></head></html>
\ No newline at end of file
diff --git a/docs/1.2.1/flink/flink-ddl/index.html b/docs/1.2.1/flink/flink-ddl/index.html
new file mode 100644
index 00000000..318cc963
--- /dev/null
+++ b/docs/1.2.1/flink/flink-ddl/index.html
@@ -0,0 +1 @@
+<!doctype html><html lang=en-us><head><title>https://iceberg.apache.org/docs/1.2.1/flink-ddl/</title><link rel=canonical href=https://iceberg.apache.org/docs/1.2.1/flink-ddl/><meta name=robots content="noindex"><meta charset=utf-8><meta http-equiv=refresh content="0; url=https://iceberg.apache.org/docs/1.2.1/flink-ddl/"></head></html>
\ No newline at end of file
diff --git a/docs/1.2.1/flink/flink-queries/index.html b/docs/1.2.1/flink/flink-queries/index.html
new file mode 100644
index 00000000..e4b3baf6
--- /dev/null
+++ b/docs/1.2.1/flink/flink-queries/index.html
@@ -0,0 +1 @@
+<!doctype html><html lang=en-us><head><title>https://iceberg.apache.org/docs/1.2.1/flink-queries/</title><link rel=canonical href=https://iceberg.apache.org/docs/1.2.1/flink-queries/><meta name=robots content="noindex"><meta charset=utf-8><meta http-equiv=refresh content="0; url=https://iceberg.apache.org/docs/1.2.1/flink-queries/"></head></html>
\ No newline at end of file
diff --git a/docs/1.2.1/flink/flink-writes/index.html b/docs/1.2.1/flink/flink-writes/index.html
new file mode 100644
index 00000000..f8d6af1f
--- /dev/null
+++ b/docs/1.2.1/flink/flink-writes/index.html
@@ -0,0 +1 @@
+<!doctype html><html lang=en-us><head><title>https://iceberg.apache.org/docs/1.2.1/flink-writes/</title><link rel=canonical href=https://iceberg.apache.org/docs/1.2.1/flink-writes/><meta name=robots content="noindex"><meta charset=utf-8><meta http-equiv=refresh content="0; url=https://iceberg.apache.org/docs/1.2.1/flink-writes/"></head></html>
\ No newline at end of file
diff --git a/docs/1.2.1/flink/flink/index.html b/docs/1.2.1/flink/flink/index.html
new file mode 100644
index 00000000..9ac017d4
--- /dev/null
+++ b/docs/1.2.1/flink/flink/index.html
@@ -0,0 +1 @@
+<!doctype html><html lang=en-us><head><title>https://iceberg.apache.org/docs/1.2.1/flink/</title><link rel=canonical href=https://iceberg.apache.org/docs/1.2.1/flink/><meta name=robots content="noindex"><meta charset=utf-8><meta http-equiv=refresh content="0; url=https://iceberg.apache.org/docs/1.2.1/flink/"></head></html>
\ No newline at end of file
diff --git a/docs/1.2.1/flink/index.html b/docs/1.2.1/flink/index.html
index 7b3f30c8..5d4bf93e 100644
--- a/docs/1.2.1/flink/index.html
+++ b/docs/1.2.1/flink/index.html
@@ -1,4 +1,4 @@
-<!doctype html><html><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><meta name=description content><meta name=author content><title>Enabling Iceberg in Flink</title><link href=../css/bootstrap.css rel=stylesheet><link href=../css/markdown.css rel=stylesheet><link href=../css/katex.min.css rel=stylesheet><link href=../css/iceberg-theme.css rel=stylesheet><link href=../font-awesome-4.7.0/css/font [...]
+<!doctype html><html><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><meta name=description content><meta name=author content><title>Flink Getting Started</title><link href=../css/bootstrap.css rel=stylesheet><link href=../css/markdown.css rel=stylesheet><link href=../css/katex.min.css rel=stylesheet><link href=../css/iceberg-theme.css rel=stylesheet><link href=../font-awesome-4.7.0/css/font-awe [...]
 <span class=sr-only>Toggle navigation</span>
 <span class=icon-bar></span>
 <span class=icon-bar></span>
@@ -9,11 +9,11 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class=chevron-toggle data-toggle=collapse data-parent=full href=#Flink><spa [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class="collapse in"><ul class=sub-menu><li><a id=active href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li> [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class="collapse in"><ul class=sub-menu><li><a id=active href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li>< [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=API class=collapse><ul class=sub-menu><li><a href=../java-api-quickstart/>Java Quickstart</a></li><li><a href=../api/>Java API</a></li><li><a href=../custom-catalog/>Java Custom Catalog</a></li></ul></div><li><a href=https://iceberg.apache.org/docs/1.2.1/../../javadoc/latest><span>Javadoc</span></a></li><li><a target=_blank href=https://py.iceberg.apache.org/><span>PyIceberg</span></a></li></div></div><div id=content class=markdown-body> [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=API class=collapse><ul class=sub-menu><li><a href=../java-api-quickstart/>Java Quickstart</a></li><li><a href=../api/>Java API</a></li><li><a href=../custom-catalog/>Java Custom Catalog</a></li></ul></div><li><a href=https://iceberg.apache.org/docs/1.2.1/../../javadoc/latest><span>Javadoc</span></a></li><li><a target=_blank href=https://py.iceberg.apache.org/><span>PyIceberg</span></a></li></div></div><div id=content class=markdown-body> [...]
 </span></span><span style=display:flex><span>SCALA_VERSION<span style=color:#f92672>=</span>2.12
 </span></span><span style=display:flex><span>APACHE_FLINK_URL<span style=color:#f92672>=</span>https://archive.apache.org/dist/flink/
 </span></span><span style=display:flex><span>wget <span style=color:#e6db74>${</span>APACHE_FLINK_URL<span style=color:#e6db74>}</span>/flink-<span style=color:#e6db74>${</span>FLINK_VERSION<span style=color:#e6db74>}</span>/flink-<span style=color:#e6db74>${</span>FLINK_VERSION<span style=color:#e6db74>}</span>-bin-scala_<span style=color:#e6db74>${</span>SCALA_VERSION<span style=color:#e6db74>}</span>.tgz
@@ -82,7 +82,7 @@
 | +I |                  137 |                  249 |                            1.0 |
 +----+----------------------+----------------------+--------------------------------+
 5 rows in set
-</code></pre><p>For more details, please refer to the <a href=https://ci.apache.org/projects/flink/flink-docs-release-1.16/docs/dev/python/table/intro_to_table_api/>Python Table API</a>.</p><h2 id=creating-catalogs-and-using-catalogs>Creating catalogs and using catalogs.</h2><p>Flink support to create catalogs by using Flink SQL.</p><h3 id=catalog-configuration>Catalog Configuration</h3><p>A catalog is created and named by executing the following query (replace <code>&lt;catalog_name></c [...]
+</code></pre><p>For more details, please refer to the <a href=https://ci.apache.org/projects/flink/flink-docs-release-1.16/docs/dev/python/table/intro_to_table_api/>Python Table API</a>.</p><h2 id=adding-catalogs>Adding catalogs.</h2><p>Flink support to create catalogs by using Flink SQL.</p><h3 id=catalog-configuration>Catalog Configuration</h3><p>A catalog is created and named by executing the following query (replace <code>&lt;catalog_name></code> with your catalog name and
 <code>&lt;config_key></code>=<code>&lt;config_value></code> with catalog implementation config):</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>CATALOG</span> <span style=color:#f92672>&lt;</span><span style=color:#66d9ef>catalog_name</span><span style=color:#f92672>&gt;</span [...]
 </span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
 </span></span><span style=display:flex><span>  <span style=color:#f92672>`&lt;</span>config_key<span style=color:#f92672>&gt;`=`&lt;</span>config_value<span style=color:#f92672>&gt;`</span>
@@ -95,200 +95,15 @@
 </span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;property-version&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;1&#39;</span>,
 </span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;warehouse&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hdfs://nn:8020/warehouse/path&#39;</span>
 </span></span><span style=display:flex><span>);
-</span></span></code></pre></div><p>The following properties can be set if using the Hive catalog:</p><ul><li><code>uri</code>: The Hive metastore&rsquo;s thrift URI. (Required)</li><li><code>clients</code>: The Hive metastore client pool size, default value is 2. (Optional)</li><li><code>warehouse</code>: The Hive warehouse location, users should specify this path if neither set the <code>hive-conf-dir</code> to specify a location containing a <code>hive-site.xml</code> configuration fi [...]
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;catalog-type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hadoop&#39;</span>,
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;warehouse&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hdfs://nn:8020/warehouse/path&#39;</span>,
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;property-version&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;1&#39;</span>
-</span></span><span style=display:flex><span>);
-</span></span></code></pre></div><p>The following properties can be set if using the Hadoop catalog:</p><ul><li><code>warehouse</code>: The HDFS directory to store metadata files and data files. (Required)</li></ul><p>Execute the sql command <code>USE CATALOG hadoop_catalog</code> to set the current catalog.</p><h3 id=rest-catalog>REST catalog</h3><p>This creates an iceberg catalog named <code>rest_catalog</code> that can be configured using <code>'catalog-type'='rest'</code>, which load [...]
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;catalog-type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;rest&#39;</span>,
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;uri&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;https://localhost/&#39;</span>
-</span></span><span style=display:flex><span>);
-</span></span></code></pre></div><p>The following properties can be set if using the REST catalog:</p><ul><li><code>uri</code>: The URL to the REST Catalog (Required)</li><li><code>credential</code>: A credential to exchange for a token in the OAuth2 client credentials flow (Optional)</li><li><code>token</code>: A token which will be used to interact with the server (Optional)</li></ul><h3 id=custom-catalog>Custom catalog</h3><p>Flink also supports loading a custom Iceberg <code>Catalog< [...]
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;catalog-impl&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;com.my.custom.CatalogImpl&#39;</span>,
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;my-additional-catalog-config&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;my-value&#39;</span>
-</span></span><span style=display:flex><span>);
-</span></span></code></pre></div><h3 id=create-through-yaml-config>Create through YAML config</h3><p>Catalogs can be registered in <code>sql-client-defaults.yaml</code> before starting the SQL client.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-yaml data-lang=yaml><span style=display:flex><span><span style=color:#f92672>catalogs</span>: 
-</span></span><span style=display:flex><span>  - <span style=color:#f92672>name</span>: <span style=color:#ae81ff>my_catalog</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>type</span>: <span style=color:#ae81ff>iceberg</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>catalog-type</span>: <span style=color:#ae81ff>hadoop</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>warehouse</span>: <span style=color:#ae81ff>hdfs://nn:8020/warehouse/path</span>
-</span></span></code></pre></div><h3 id=create-through-sql-files>Create through SQL Files</h3><p>The Flink SQL Client supports the <code>-i</code> startup option to execute an initialization SQL file to set up environment when starting up the SQL Client.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#75715e>-- define avail [...]
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>CATALOG</span> hive_catalog <span style=color:#66d9ef>WITH</span> (
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;catalog-type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hive&#39;</span>,
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;uri&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;thrift://localhost:9083&#39;</span>,
-</span></span><span style=display:flex><span>  <span style=color:#e6db74>&#39;warehouse&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hdfs://nn:8020/warehouse/path&#39;</span>
-</span></span><span style=display:flex><span>);
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>USE <span style=color:#66d9ef>CATALOG</span> hive_catalog;
-</span></span></code></pre></div><p>Using <code>-i &lt;init.sql></code> option to initialize SQL Client session:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-bash data-lang=bash><span style=display:flex><span>/path/to/bin/sql-client.sh -i /path/to/init.sql
-</span></span></code></pre></div><h2 id=ddl-commands>DDL commands</h2><h3 id=create-database><code>CREATE DATABASE</code></h3><p>By default, Iceberg will use the <code>default</code> database in Flink. Using the following example to create a separate database in order to avoid creating tables under the <code>default</code> database:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-l [...]
-</span></span><span style=display:flex><span>USE iceberg_db;
-</span></span></code></pre></div><h3 id=create-table><code>CREATE TABLE</code></h3><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>TABLE</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d [...]
+</span></span></code></pre></div><p>The following properties can be set if using the Hive catalog:</p><ul><li><code>uri</code>: The Hive metastore&rsquo;s thrift URI. (Required)</li><li><code>clients</code>: The Hive metastore client pool size, default value is 2. (Optional)</li><li><code>warehouse</code>: The Hive warehouse location, users should specify this path if neither set the <code>hive-conf-dir</code> to specify a location containing a <code>hive-site.xml</code> configuration fi [...]
 </span></span><span style=display:flex><span>    id BIGINT <span style=color:#66d9ef>COMMENT</span> <span style=color:#e6db74>&#39;unique id&#39;</span>,
 </span></span><span style=display:flex><span>    <span style=color:#66d9ef>data</span> STRING
 </span></span><span style=display:flex><span>);
-</span></span></code></pre></div><p>Table create commands support the commonly used <a href=https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/>Flink create clauses</a> including:</p><ul><li><code>PARTITION BY (column1, column2, ...)</code> to configure partitioning, Flink does not yet support hidden partitioning.</li><li><code>COMMENT 'table document'</code> to set a table description.</li><li><code>WITH ('key'='value', ...)</code> to set <a href=../configura [...]
-</span></span><span style=display:flex><span>    id BIGINT <span style=color:#66d9ef>COMMENT</span> <span style=color:#e6db74>&#39;unique id&#39;</span>,
-</span></span><span style=display:flex><span>    <span style=color:#66d9ef>data</span> STRING
-</span></span><span style=display:flex><span>) PARTITIONED <span style=color:#66d9ef>BY</span> (<span style=color:#66d9ef>data</span>);
-</span></span></code></pre></div><p>Iceberg support hidden partition but Flink don&rsquo;t support partitioning by a function on columns, so there is no way to support hidden partition in Flink DDL.</p><h3 id=create-table-like><code>CREATE TABLE LIKE</code></h3><p>To create a table with the same schema, partitioning, and table properties as another table, use <code>CREATE TABLE LIKE</code>.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size [...]
-</span></span><span style=display:flex><span>    id BIGINT <span style=color:#66d9ef>COMMENT</span> <span style=color:#e6db74>&#39;unique id&#39;</span>,
-</span></span><span style=display:flex><span>    <span style=color:#66d9ef>data</span> STRING
-</span></span><span style=display:flex><span>);
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>TABLE</span>  <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample_like<span style=color:#f92672>`</span> <span style=color:#66d9ef>LIKE</span> <span style=color:#f92672>`</span>hive_catalog<span style=c [...]
-</span></span></code></pre></div><p>For more details, refer to the <a href=https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/>Flink <code>CREATE TABLE</code> documentation</a>.</p><h3 id=alter-table><code>ALTER TABLE</code></h3><p>Iceberg only support altering table properties:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=d [...]
-</span></span></code></pre></div><h3 id=alter-table--rename-to><code>ALTER TABLE .. RENAME TO</code></h3><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>ALTER</span> <span style=color:#66d9ef>TABLE</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>< [...]
-</span></span></code></pre></div><h3 id=drop-table><code>DROP TABLE</code></h3><p>To delete a table, run:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>DROP</span> <span style=color:#66d9ef>TABLE</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</spa [...]
-</span></span></code></pre></div><h2 id=querying-with-sql>Querying with SQL</h2><p>Iceberg support both streaming and batch read in Flink. Execute the following sql command to switch execution mode from <code>streaming</code> to <code>batch</code>, and vice versa:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#75715e>-- Ex [...]
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> execution.runtime<span style=color:#f92672>-</span><span style=color:#66d9ef>mode</span> <span style=color:#f92672>=</span> streaming;
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>-- Execute the flink job in batch mode for current session context
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> execution.runtime<span style=color:#f92672>-</span><span style=color:#66d9ef>mode</span> <span style=color:#f92672>=</span> batch;
-</span></span></code></pre></div><h3 id=flink-batch-read>Flink batch read</h3><p>Submit a Flink <strong>batch</strong> job using the following sentences:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#75715e>-- Execute the flink job in batch mode for current session context
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> execution.runtime<span style=color:#f92672>-</span><span style=color:#66d9ef>mode</span> <span style=color:#f92672>=</span> batch;
-</span></span><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> sample;
-</span></span></code></pre></div><h3 id=flink-streaming-read>Flink streaming read</h3><p>Iceberg supports processing incremental data in flink streaming jobs which starts from a historical snapshot-id:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#75715e>-- Submit the flink job in streaming mode for current session.
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> execution.runtime<span style=color:#f92672>-</span><span style=color:#66d9ef>mode</span> <span style=color:#f92672>=</span> streaming;
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> <span style=color:#66d9ef>table</span>.<span style=color:#66d9ef>dynamic</span><span style=color:#f92672>-</span><span style=color:#66d9ef>table</span><span style=color:#f92672>-</span><span style=color:#66d9ef>options</span>.enabled<span style=color:#f92672>=</span><span style=color:#66d9ef>true</span>;
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> sample <span style=color:#75715e>/*+ OPTIONS(&#39;streaming&#39;=&#39;true&#39;, &#39;monitor-interval&#39;=&#39;1s&#39;)*/</span> ;
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>-- Read all incremental data starting from the snapshot-id &#39;3821550127947089987&#39; (records from this snapshot will be excluded).
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> sample <span style=color:#75715e>/*+ OPTIONS(&#39;streaming&#39;=&#39;true&#39;, &#39;monitor-interval&#39;=&#39;1s&#39;, &#39;start-snapshot-id&#39;=&#39;3821550127947089987&#39;)*/</span> ;
-</span></span></code></pre></div><p>There are some options that could be set in Flink SQL hint options for streaming job, see <a href=#Read-options>read options</a> for details.</p><h3 id=flip-27-source-for-sql>FLIP-27 source for SQL</h3><p>Here are the SQL settings for the <a href=https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface>FLIP-27</a> source. All other SQL settings and options documented above are applicable to the FLIP-27 source.</p><div clas [...]
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> <span style=color:#66d9ef>table</span>.<span style=color:#66d9ef>exec</span>.iceberg.use<span style=color:#f92672>-</span>flip27<span style=color:#f92672>-</span><span style=color:#66d9ef>source</span> <span style=color:#f92672>=</span> <span style=color:#66d9ef>true</span>;
-</span></span></code></pre></div><h2 id=writing-with-sql>Writing with SQL</h2><p>Iceberg support both <code>INSERT INTO</code> and <code>INSERT OVERWRITE</code>.</p><h3 id=insert-into><code>INSERT INTO</code></h3><p>To append new data to a table with a Flink streaming job, use <code>INSERT INTO</code>:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:fle [...]
+</span></span></code></pre></div><h2 id=writing>Writing</h2><p>To append new data to a table with a Flink streaming job, use <code>INSERT INTO</code>:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> <span style=color:#66d9ef>INTO</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f9 [...]
 </span></span><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> <span style=color:#66d9ef>INTO</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> <span style=color:#66d9ef>SELECT</span> id, <span style=color:#66d9ef>data</span> <span style=color:#66d [...]
-</span></span></code></pre></div><h3 id=insert-overwrite><code>INSERT OVERWRITE</code></h3><p>To replace data in the table with the result of a query, use <code>INSERT OVERWRITE</code> in batch job (flink streaming job does not support <code>INSERT OVERWRITE</code>). Overwrites are atomic operations for Iceberg tables.</p><p>Partitions that have rows produced by the SELECT query will be replaced, for example:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#2 [...]
+</span></span></code></pre></div><p>To replace data in the table with the result of a query, use <code>INSERT OVERWRITE</code> in batch job (flink streaming job does not support <code>INSERT OVERWRITE</code>). Overwrites are atomic operations for Iceberg tables.</p><p>Partitions that have rows produced by the SELECT query will be replaced, for example:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class [...]
 </span></span></code></pre></div><p>Iceberg also support overwriting given partitions by the <code>select</code> values:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> OVERWRITE <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span styl [...]
-</span></span></code></pre></div><p>For a partitioned iceberg table, when all the partition columns are set a value in <code>PARTITION</code> clause, it is inserting into a static partition, otherwise if partial partition columns (prefix part of all partition columns) are set a value in <code>PARTITION</code> clause, it is writing the query result into a dynamic partition.
-For an unpartitioned iceberg table, its data will be completely overwritten by <code>INSERT OVERWRITE</code>.</p><h3 id=upsert><code>UPSERT</code></h3><p>Iceberg supports <code>UPSERT</code> based on the primary key when writing data into v2 table format. There are two ways to enable upsert.</p><ol><li>Enable the <code>UPSERT</code> mode as table-level property <code>write.upsert.enabled</code>. Here is an example SQL statement to set the table property when creating a table. It would be [...]
-</span></span><span style=display:flex><span>  <span style=color:#f92672>`</span>id<span style=color:#f92672>`</span>  INT <span style=color:#66d9ef>UNIQUE</span> <span style=color:#66d9ef>COMMENT</span> <span style=color:#e6db74>&#39;unique id&#39;</span>,
-</span></span><span style=display:flex><span>  <span style=color:#f92672>`</span><span style=color:#66d9ef>data</span><span style=color:#f92672>`</span> STRING <span style=color:#66d9ef>NOT</span> <span style=color:#66d9ef>NULL</span>,
-</span></span><span style=display:flex><span> <span style=color:#66d9ef>PRIMARY</span> <span style=color:#66d9ef>KEY</span>(<span style=color:#f92672>`</span>id<span style=color:#f92672>`</span>) <span style=color:#66d9ef>NOT</span> ENFORCED
-</span></span><span style=display:flex><span>) <span style=color:#66d9ef>with</span> (<span style=color:#e6db74>&#39;format-version&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;2&#39;</span>, <span style=color:#e6db74>&#39;write.upsert.enabled&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;true&#39;</span>);
-</span></span></code></pre></div><ol start=2><li>Enabling <code>UPSERT</code> mode using <code>upsert-enabled</code> in the <a href=#Write-options>write options</a> provides more flexibility than a table level config. Note that you still need to use v2 table format and specify the primary key when creating the table.</li></ol><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><s [...]
-</span></span><span style=display:flex><span>...
-</span></span></code></pre></div><div class=info>OVERWRITE and UPSERT can&rsquo;t be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.</div><h2 id=reading-with-datastream>Reading with DataStream</h2><p>Iceberg support streaming or batch read in Java API now.</p><h3 id=batch-read>Batch Read</h3><p>This example will read all records from iceberg table and then print to the stdout console in flink batch job:</p><div class= [...]
-</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>);</span>
-</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> batch <span style=color:#f92672>=</span> FlinkSource<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>()</span>
-</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>env</span><span style=color:#f92672>(</span>env<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>streaming</span><span style=color:#f92672>(</span><span style=color:#66d9ef>false</span><span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>build</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>// Print all records to stdout.
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span>batch<span style=color:#f92672>.</span><span style=color:#a6e22e>print</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>// Submit and execute this batch read job.
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg Batch Read&#34;</span><span style=color:#f92672>);</span>
-</span></span></code></pre></div><h3 id=streaming-read>Streaming read</h3><p>This example will read incremental records which start from snapshot-id &lsquo;3821550127947089987&rsquo; and print to stdout console in flink streaming job:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>StreamExecutionEnvironment env <span style=color:#f92672>=< [...]
-</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>);</span>
-</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> stream <span style=color:#f92672>=</span> FlinkSource<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>()</span>
-</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>env</span><span style=color:#f92672>(</span>env<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>streaming</span><span style=color:#f92672>(</span><span style=color:#66d9ef>true</span><span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>startSnapshotId</span><span style=color:#f92672>(</span><span style=color:#ae81ff>3821550127947089987L</span><span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>     <span style=color:#f92672>.</span><span style=color:#a6e22e>build</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>// Print all records to stdout.
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span>stream<span style=color:#f92672>.</span><span style=color:#a6e22e>print</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>// Submit and execute this streaming read job.
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg Streaming Read&#34;</span><span style=color:#f92672>);</span>
-</span></span></code></pre></div><p>There are other options that can be set, please see the <a href=../../../javadoc/1.2.0/org/apache/iceberg/flink/source/FlinkSource.html>FlinkSource#Builder</a>.</p><h2 id=reading-with-datastream-flip-27-source>Reading with DataStream (FLIP-27 source)</h2><p><a href=https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface>FLIP-27 source interface</a>
-was introduced in Flink 1.12. It aims to solve several shortcomings of the old <code>SourceFunction</code>
-streaming source interface. It also unifies the source interfaces for both batch and streaming executions.
-Most source connectors (like Kafka, file) in Flink repo have migrated to the FLIP-27 interface.
-Flink is planning to deprecate the old <code>SourceFunction</code> interface in the near future.</p><p>A FLIP-27 based Flink <code>IcebergSource</code> is added in <code>iceberg-flink</code> module. The FLIP-27 <code>IcebergSource</code> is currently an experimental feature.</p><h3 id=batch-read-1>Batch Read</h3><p>This example will read all records from iceberg table and then print to the stdout console in flink batch job:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;back [...]
-</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>);</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>IcebergSource<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> source <span style=color:#f92672>=</span> IcebergSource<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>()</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>assignerFactory</span><span style=color:#f92672>(</span><span style=color:#66d9ef>new</span> SimpleSplitAssignerFactory<span style=color:#f92672>())</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>build</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> batch <span style=color:#f92672>=</span> env<span style=color:#f92672>.</span><span style=color:#a6e22e>fromSource</span><span style=color:#f92672>(</span>
-</span></span><span style=display:flex><span>    source<span style=color:#f92672>,</span>
-</span></span><span style=display:flex><span>    WatermarkStrategy<span style=color:#f92672>.</span><span style=color:#a6e22e>noWatermarks</span><span style=color:#f92672>(),</span>
-</span></span><span style=display:flex><span>    <span style=color:#e6db74>&#34;My Iceberg Source&#34;</span><span style=color:#f92672>,</span>
-</span></span><span style=display:flex><span>    TypeInformation<span style=color:#f92672>.</span><span style=color:#a6e22e>of</span><span style=color:#f92672>(</span>RowData<span style=color:#f92672>.</span><span style=color:#a6e22e>class</span><span style=color:#f92672>));</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>// Print all records to stdout.
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span>batch<span style=color:#f92672>.</span><span style=color:#a6e22e>print</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>// Submit and execute this batch read job.
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg Batch Read&#34;</span><span style=color:#f92672>);</span>
-</span></span></code></pre></div><h3 id=streaming-read-1>Streaming read</h3><p>This example will start the streaming read from the latest table snapshot (inclusive).
-Every 60s, it polls Iceberg table to discover new append-only snapshots.
-CDC read is not supported yet.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>StreamExecutionEnvironment env <span style=color:#f92672>=</span> StreamExecutionEnvironment<span style=color:#f92672>.</span><span style=color:#a6e22e>createLocalEnvironment</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>);</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>IcebergSource source <span style=color:#f92672>=</span> IcebergSource<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>()</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>assignerFactory</span><span style=color:#f92672>(</span><span style=color:#66d9ef>new</span> SimpleSplitAssignerFactory<span style=color:#f92672>())</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>streaming</span><span style=color:#f92672>(</span><span style=color:#66d9ef>true</span><span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>streamingStartingStrategy</span><span style=color:#f92672>(</span>StreamingStartingStrategy<span style=color:#f92672>.</span><span style=color:#a6e22e>INCREMENTAL_FROM_LATEST_SNAPSHOT</span><span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>monitorInterval</span><span style=color:#f92672>(</span>Duration<span style=color:#f92672>.</span><span style=color:#a6e22e>ofSeconds</span><span style=color:#f92672>(</span><span style=color:#ae81ff>60</span><span style=color:#f92672>))</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>build</span><span style=color:#f92672>()</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> stream <span style=color:#f92672>=</span> env<span style=color:#f92672>.</span><span style=color:#a6e22e>fromSource</span><span style=color:#f92672>(</span>
-</span></span><span style=display:flex><span>    source<span style=color:#f92672>,</span>
-</span></span><span style=display:flex><span>    WatermarkStrategy<span style=color:#f92672>.</span><span style=color:#a6e22e>noWatermarks</span><span style=color:#f92672>(),</span>
-</span></span><span style=display:flex><span>    <span style=color:#e6db74>&#34;My Iceberg Source&#34;</span><span style=color:#f92672>,</span>
-</span></span><span style=display:flex><span>    TypeInformation<span style=color:#f92672>.</span><span style=color:#a6e22e>of</span><span style=color:#f92672>(</span>RowData<span style=color:#f92672>.</span><span style=color:#a6e22e>class</span><span style=color:#f92672>));</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>// Print all records to stdout.
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span>stream<span style=color:#f92672>.</span><span style=color:#a6e22e>print</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>// Submit and execute this streaming read job.
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg Streaming Read&#34;</span><span style=color:#f92672>);</span>
-</span></span></code></pre></div><p>There are other options that could be set by Java API, please see the
-<a href=../../../javadoc/1.2.0/org/apache/iceberg/flink/source/IcebergSource.html>IcebergSource#Builder</a>.</p><h3 id=read-as-avro-genericrecord>Read as Avro GenericRecord</h3><p>FLIP-27 Iceberg source provides <code>AvroGenericRecordReaderFunction</code> that converts
-Flink <code>RowData</code> Avro <code>GenericRecord</code>. You can use the convert to read from
-Iceberg table as Avro GenericRecord DataStream.</p><p>Please make sure <code>flink-avro</code> jar is included in the classpath.
-Also <code>iceberg-flink-runtime</code> shaded bundle jar can&rsquo;t be used
-because the runtime jar shades the avro package.
-Please use non-shaded <code>iceberg-flink</code> jar instead.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> <span style=color:#f92672>...;</span>
-</span></span><span style=display:flex><span>Table table<span style=color:#f92672>;</span>
-</span></span><span style=display:flex><span><span style=color:#66d9ef>try</span> <span style=color:#f92672>(</span>TableLoader loader <span style=color:#f92672>=</span> tableLoader<span style=color:#f92672>)</span> <span style=color:#f92672>{</span>
-</span></span><span style=display:flex><span>    loader<span style=color:#f92672>.</span><span style=color:#a6e22e>open</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>    table <span style=color:#f92672>=</span> loader<span style=color:#f92672>.</span><span style=color:#a6e22e>loadTable</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span><span style=color:#f92672>}</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>AvroGenericRecordReaderFunction readerFunction <span style=color:#f92672>=</span> AvroGenericRecordReaderFunction<span style=color:#f92672>.</span><span style=color:#a6e22e>fromTable</span><span style=color:#f92672>(</span>table<span style=color:#f92672>);</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>IcebergSource<span style=color:#f92672>&lt;</span>GenericRecord<span style=color:#f92672>&gt;</span> source <span style=color:#f92672>=</span>
-</span></span><span style=display:flex><span>    IcebergSource<span style=color:#f92672>.&lt;</span>GenericRecord<span style=color:#f92672>&gt;</span>builder<span style=color:#f92672>()</span>
-</span></span><span style=display:flex><span>        <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>        <span style=color:#f92672>.</span><span style=color:#a6e22e>readerFunction</span><span style=color:#f92672>(</span>readerFunction<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>        <span style=color:#f92672>.</span><span style=color:#a6e22e>assignerFactory</span><span style=color:#f92672>(</span><span style=color:#66d9ef>new</span> SimpleSplitAssignerFactory<span style=color:#f92672>())</span>
-</span></span><span style=display:flex><span>        <span style=color:#f92672>...</span>
-</span></span><span style=display:flex><span>        <span style=color:#f92672>.</span><span style=color:#a6e22e>build</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>Row<span style=color:#f92672>&gt;</span> stream <span style=color:#f92672>=</span> env<span style=color:#f92672>.</span><span style=color:#a6e22e>fromSource</span><span style=color:#f92672>(</span>source<span style=color:#f92672>,</span> WatermarkStrategy<span style=color:#f92672>.</span><span style=color:#a6e22e>noWatermarks</span><span style=color:#f92672>(),</span>
-</span></span><span style=display:flex><span>    <span style=color:#e6db74>&#34;Iceberg Source as Avro GenericRecord&#34;</span><span style=color:#f92672>,</span> <span style=color:#66d9ef>new</span> GenericRecordAvroTypeInfo<span style=color:#f92672>(</span>avroSchema<span style=color:#f92672>));</span>
-</span></span></code></pre></div><h2 id=writing-with-datastream>Writing with DataStream</h2><p>Iceberg support writing to iceberg table from different DataStream input.</p><h3 id=appending-data>Appending data.</h3><p>Flink supports writing <code>DataStream&lt;RowData></code> and <code>DataStream&lt;Row></code> to the sink iceberg table natively.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=langua [...]
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> input <span style=color:#f92672>=</span> <span style=color:#f92672>...</span> <span style=color:#f92672>;</span>
-</span></span><span style=display:flex><span>Configuration hadoopConf <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> Configuration<span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>,</span> hadoopConf<span style=color:#f92672>);</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>(</span>input<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg DataStream&#34;</span><span style=color:#f92672>);</span>
-</span></span></code></pre></div><p>The iceberg API also allows users to write generic <code>DataStream&lt;T></code> to iceberg table, more example could be found in this <a href=https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java>unit test</a>.</p><h3 id=overwrite-data>Overwrite data</h3><p>Set the <code>overwrite</code> flag in FlinkSink builder to overwrite the data in existing iceberg tables:</p><div cl [...]
+</span></span></code></pre></div><p>Flink supports writing <code>DataStream&lt;RowData></code> and <code>DataStream&lt;Row></code> to the sink iceberg table natively.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>StreamExecutionEnvironment env <span style=color:#f92672>=</span> <span style=color:#f92672>...;</span>
 </span></span><span style=display:flex><span>
 </span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> input <span style=color:#f92672>=</span> <span style=color:#f92672>...</span> <span style=color:#f92672>;</span>
 </span></span><span style=display:flex><span>Configuration hadoopConf <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> Configuration<span style=color:#f92672>();</span>
@@ -296,105 +111,30 @@ Please use non-shaded <code>iceberg-flink</code> jar instead.</p><div class=high
 </span></span><span style=display:flex><span>
 </span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>(</span>input<span style=color:#f92672>)</span>
 </span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>overwrite</span><span style=color:#f92672>(</span><span style=color:#66d9ef>true</span><span style=color:#f92672>)</span>
 </span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span>
 </span></span><span style=display:flex><span>
 </span></span><span style=display:flex><span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg DataStream&#34;</span><span style=color:#f92672>);</span>
-</span></span></code></pre></div><h3 id=upsert-data>Upsert data</h3><p>Set the <code>upsert</code> flag in FlinkSink builder to upsert the data in existing iceberg table. The table must use v2 table format and have a primary key.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>StreamExecutionEnvironment env <span style=color:#f92672>=</span [...]
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> input <span style=color:#f92672>=</span> <span style=color:#f92672>...</span> <span style=color:#f92672>;</span>
-</span></span><span style=display:flex><span>Configuration hadoopConf <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> Configuration<span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>,</span> hadoopConf<span style=color:#f92672>);</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>(</span>input<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>upsert</span><span style=color:#f92672>(</span><span style=color:#66d9ef>true</span><span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>    <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg DataStream&#34;</span><span style=color:#f92672>);</span>
-</span></span></code></pre></div><div class=info>OVERWRITE and UPSERT can&rsquo;t be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.</div><h3 id=write-with-avro-genericrecord>Write with Avro GenericRecord</h3><p>Flink Iceberg sink provides <code>AvroGenericRecordToRowDataMapper</code> that converts
-Avro <code>GenericRecord</code> to Flink <code>RowData</code>. You can use the mapper to write
-Avro GenericRecord DataStream to Iceberg.</p><p>Please make sure <code>flink-avro</code> jar is included in the classpath.
-Also <code>iceberg-flink-runtime</code> shaded bundle jar can&rsquo;t be used
-because the runtime jar shades the avro package.
-Please use non-shaded <code>iceberg-flink</code> jar instead.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>org<span style=color:#f92672>.</span><span style=color:#a6e22e>apache</span><span style=color:#f92672>.</span><span style=color:#a6e22e>avro</span><span style=color:#f92672>.</span><spa [...]
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>Schema icebergSchema <span style=color:#f92672>=</span> table<span style=color:#f92672>.</span><span style=color:#a6e22e>schema</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span><span style=color:#75715e>// The Avro schema converted from Iceberg schema can&#39;t be used
-</span></span></span><span style=display:flex><span><span style=color:#75715e>// due to precision difference between how Iceberg schema (micro)
-</span></span></span><span style=display:flex><span><span style=color:#75715e>// and Flink AvroToRowDataConverters (milli) deal with time type.
-</span></span></span><span style=display:flex><span><span style=color:#75715e>// Instead, use the Avro schema defined directly.
-</span></span></span><span style=display:flex><span><span style=color:#75715e>// See AvroGenericRecordToRowDataMapper Javadoc for more details.
-</span></span></span><span style=display:flex><span><span style=color:#75715e></span>org<span style=color:#f92672>.</span><span style=color:#a6e22e>apache</span><span style=color:#f92672>.</span><span style=color:#a6e22e>avro</span><span style=color:#f92672>.</span><span style=color:#a6e22e>Schema</span> avroSchema <span style=color:#f92672>=</span> AvroSchemaUtil<span style=color:#f92672>.</span><span style=color:#a6e22e>convert</span><span style=color:#f92672>(</span>icebergSchema<span [...]
+</span></span></code></pre></div><h2 id=reading>Reading</h2><p>Submit a Flink <strong>batch</strong> job using the following sentences:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#75715e>-- Execute the flink job in batch mode for current session context
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> execution.runtime<span style=color:#f92672>-</span><span style=color:#66d9ef>mode</span> <span style=color:#f92672>=</span> batch;
+</span></span><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span>;
+</span></span></code></pre></div><p>Iceberg supports processing incremental data in flink <strong>streaming</strong> jobs which starts from a historical snapshot-id:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#75715e>-- Submit the flink job in streaming mode for current session.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> execution.runtime<span style=color:#f92672>-</span><span style=color:#66d9ef>mode</span> <span style=color:#f92672>=</span> streaming;
 </span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>GenericRecordAvroTypeInfo avroTypeInfo <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> GenericRecordAvroTypeInfo<span style=color:#f92672>(</span>avroSchema<span style=color:#f92672>);</span>
-</span></span><span style=display:flex><span>RowType rowType <span style=color:#f92672>=</span> FlinkSchemaUtil<span style=color:#f92672>.</span><span style=color:#a6e22e>convert</span><span style=color:#f92672>(</span>icebergSchema<span style=color:#f92672>);</span>
+</span></span><span style=display:flex><span><span style=color:#75715e>-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> <span style=color:#66d9ef>table</span>.<span style=color:#66d9ef>dynamic</span><span style=color:#f92672>-</span><span style=color:#66d9ef>table</span><span style=color:#f92672>-</span><span style=color:#66d9ef>options</span>.enabled<span style=color:#f92672>=</span><span style=color:#66d9ef>true</span>;
 </span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>builderFor</span><span style=color:#f92672>(</span>
-</span></span><span style=display:flex><span>    dataStream<span style=color:#f92672>,</span>
-</span></span><span style=display:flex><span>    AvroGenericRecordToRowDataMapper<span style=color:#f92672>.</span><span style=color:#a6e22e>forAvroSchema</span><span style=color:#f92672>(</span>avroSchema<span style=color:#f92672>),</span>
-</span></span><span style=display:flex><span>    FlinkCompatibilityUtil<span style=color:#f92672>.</span><span style=color:#a6e22e>toTypeInfo</span><span style=color:#f92672>(</span>rowType<span style=color:#f92672>))</span>
-</span></span><span style=display:flex><span>  <span style=color:#f92672>.</span><span style=color:#a6e22e>table</span><span style=color:#f92672>(</span>table<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>  <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>  <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span>
-</span></span></code></pre></div><h3 id=netrics>Netrics</h3><p>The following Flink metrics are provided by the Flink Iceberg sink.</p><p>Parallel writer metrics are added under the sub group of <code>IcebergStreamWriter</code>.
-They should have the following key-value tags.</p><ul><li>table: full table name (like iceberg.my_db.my_table)</li><li>subtask_index: writer subtask index starting from 0</li></ul><table><thead><tr><th>Metric name</th><th>Metric type</th><th>Description</th></tr></thead><tbody><tr><td>lastFlushDurationMs</td><td>Gague</td><td>The duration (in milli) that writer subtasks take to flush and upload the files during checkpoint.</td></tr><tr><td>flushedDataFiles</td><td>Counter</td><td>Number  [...]
-They should have the following key-value tags.</p><ul><li>table: full table name (like iceberg.my_db.my_table)</li></ul><table><thead><tr><th>Metric name</th><th>Metric type</th><th>Description</th></tr></thead><tbody><tr><td>lastCheckpointDurationMs</td><td>Gague</td><td>The duration (in milli) that the committer operator checkpoints its state.</td></tr><tr><td>lastCommitDurationMs</td><td>Gague</td><td>The duration (in milli) that the Iceberg table commit takes.</td></tr><tr><td>commit [...]
-to detect failed or missing Iceberg commits.</p><ul><li>Iceberg commit happened after successful Flink checkpoint in the <code>notifyCheckpointComplete</code> callback.
-It could happen that Iceberg commits failed (for whatever reason), while Flink checkpoints succeeding.</li><li>It could also happen that <code>notifyCheckpointComplete</code> wasn&rsquo;t triggered (for whatever bug).
-As a result, there won&rsquo;t be any Iceberg commits attempted.</li></ul><p>If the checkpoint interval (and expected Iceberg commit interval) is 5 minutes, set up alert with rule like <code>elapsedSecondsSinceLastSuccessfulCommit > 60 minutes</code> to detect failed or missing Iceberg commits in the past hour.</p><h2 id=options>Options</h2><h3 id=read-options>Read options</h3><p>Flink read options are passed when configuring the Flink IcebergSource:</p><pre tabindex=0><code>IcebergSourc [...]
-    .tableLoader(TableLoader.fromCatalog(...))
-    .assignerFactory(new SimpleSplitAssignerFactory())
-    .streaming(true)
-    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
-    .startSnapshotId(3821550127947089987L)
-    .monitorInterval(Duration.ofMillis(10L)) // or .set(&#34;monitor-interval&#34;, &#34;10s&#34;) \ set(FlinkReadOptions.MONITOR_INTERVAL, &#34;10s&#34;)
-    .build()
-</code></pre><p>For Flink SQL, read options can be passed in via SQL hints like this:</p><pre tabindex=0><code>SELECT * FROM tableName /*+ OPTIONS(&#39;monitor-interval&#39;=&#39;10s&#39;) */
-...
-</code></pre><p>Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.</p><pre tabindex=0><code>env.getConfig()
-    .getConfiguration()
-    .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
-...
-</code></pre><p><code>Read option</code> has the highest priority, followed by <code>Flink configuration</code> and then <code>Table property</code>.</p><table><thead><tr><th>Read option</th><th>Flink configuration</th><th>Table property</th><th>Default</th><th>Description</th></tr></thead><tbody><tr><td>snapshot-id</td><td>N/A</td><td>N/A</td><td>null</td><td>For time travel in batch mode. Read data from the specified snapshot-id.</td></tr><tr><td>case-sensitive</td><td>connector.iceber [...]
-    .table(table)
-    .tableLoader(tableLoader)
-    .set(&#34;write-format&#34;, &#34;orc&#34;)
-    .set(FlinkWriteOptions.OVERWRITE_MODE, &#34;true&#34;);
-</code></pre><p>For Flink SQL, write options can be passed in via SQL hints like this:</p><pre tabindex=0><code>INSERT INTO tableName /*+ OPTIONS(&#39;upsert-enabled&#39;=&#39;true&#39;) */
-...
-</code></pre><table><thead><tr><th>Flink option</th><th>Default</th><th>Description</th></tr></thead><tbody><tr><td>write-format</td><td>Table write.format.default</td><td>File format to use for this write operation; parquet, avro, or orc</td></tr><tr><td>target-file-size-bytes</td><td>As per table property</td><td>Overrides this table&rsquo;s write.target-file-size-bytes</td></tr><tr><td>upsert-enabled</td><td>Table write.upsert.enabled</td><td>Overrides this table&rsquo;s write.upsert. [...]
-</span></span></code></pre></div><table><thead><tr><th>made_current_at</th><th>snapshot_id</th><th>parent_id</th><th>is_current_ancestor</th></tr></thead><tbody><tr><td>2019-02-08 03:29:51.215</td><td>5781947118336215154</td><td>NULL</td><td>true</td></tr><tr><td>2019-02-08 03:47:55.948</td><td>5179299526185056830</td><td>5781947118336215154</td><td>true</td></tr><tr><td>2019-02-09 16:24:30.13</td><td>296410040247533544</td><td>5179299526185056830</td><td>false</td></tr><tr><td>2019-02-0 [...]
-</span></span></code></pre></div><table><thead><tr><th>timestamp</th><th>file</th><th>latest_snapshot_id</th><th>latest_schema_id</th><th>latest_sequence_number</th></tr></thead><tbody><tr><td>2022-07-28 10:43:52.93</td><td>s3://&mldr;/table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.json</td><td>null</td><td>null</td><td>null</td></tr><tr><td>2022-07-28 10:43:57.487</td><td>s3://&mldr;/table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json</td><td>1702 [...]
-</span></span></code></pre></div><table><thead><tr><th>committed_at</th><th>snapshot_id</th><th>parent_id</th><th>operation</th><th>manifest_list</th><th>summary</th></tr></thead><tbody><tr><td>2019-02-08 03:29:51.215</td><td>57897183625154</td><td>null</td><td>append</td><td>s3://&mldr;/table/metadata/snap-57897183625154-1.avro</td><td>{ added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, flink.job-id -> 2e274eecb503d85369fb390e8956c813  [...]
-</span></span><span style=display:flex><span>    h.made_current_at,
-</span></span><span style=display:flex><span>    s.<span style=color:#66d9ef>operation</span>,
-</span></span><span style=display:flex><span>    h.snapshot_id,
-</span></span><span style=display:flex><span>    h.is_current_ancestor,
-</span></span><span style=display:flex><span>    s.summary[<span style=color:#e6db74>&#39;flink.job-id&#39;</span>]
-</span></span><span style=display:flex><span><span style=color:#66d9ef>from</span> prod.db.<span style=color:#66d9ef>table</span><span style=color:#960050;background-color:#1e0010>$</span>history h
-</span></span><span style=display:flex><span><span style=color:#66d9ef>join</span> prod.db.<span style=color:#66d9ef>table</span><span style=color:#960050;background-color:#1e0010>$</span>snapshots s
-</span></span><span style=display:flex><span>  <span style=color:#66d9ef>on</span> h.snapshot_id <span style=color:#f92672>=</span> s.snapshot_id
-</span></span><span style=display:flex><span><span style=color:#66d9ef>order</span> <span style=color:#66d9ef>by</span> made_current_at
-</span></span></code></pre></div><table><thead><tr><th>made_current_at</th><th>operation</th><th>snapshot_id</th><th>is_current_ancestor</th><th>summary[flink.job-id]</th></tr></thead><tbody><tr><td>2019-02-08 03:29:51.215</td><td>append</td><td>57897183625154</td><td>true</td><td>2e274eecb503d85369fb390e8956c813</td></tr></tbody></table><h3 id=files>Files</h3><p>To show a table&rsquo;s current data files:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#2728 [...]
-</span></span></code></pre></div><table><thead><tr><th>content</th><th>file_path</th><th>file_format</th><th>spec_id</th><th>partition</th><th>record_count</th><th>file_size_in_bytes</th><th>column_sizes</th><th>value_counts</th><th>null_value_counts</th><th>nan_value_counts</th><th>lower_bounds</th><th>upper_bounds</th><th>key_metadata</th><th>split_offsets</th><th>equality_ids</th><th>sort_order_id</th></tr></thead><tbody><tr><td>0</td><td>s3:/&mldr;/table/data/00000-3-8d6d60e8-d427-48 [...]
-</span></span></code></pre></div><table><thead><tr><th>path</th><th>length</th><th>partition_spec_id</th><th>added_snapshot_id</th><th>added_data_files_count</th><th>existing_data_files_count</th><th>deleted_data_files_count</th><th>partition_summaries</th></tr></thead><tbody><tr><td>s3://&mldr;/table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro</td><td>4479</td><td>0</td><td>6668963634911763636</td><td>8</td><td>0</td><td>0</td><td>[[false,null,2019-05-13,2019-05-15]]</td></tr> [...]
-This usually occurs when reading from V1 table, where <code>contains_nan</code> is not populated.</li></ol><h3 id=partitions>Partitions</h3><p>To show a table&rsquo;s current partitions:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</sp [...]
-</span></span></code></pre></div><table><thead><tr><th>partition</th><th>record_count</th><th>file_count</th><th>spec_id</th></tr></thead><tbody><tr><td>{20211001, 11}</td><td>1</td><td>1</td><td>0</td></tr><tr><td>{20211002, 11}</td><td>1</td><td>1</td><td>0</td></tr><tr><td>{20211001, 10}</td><td>1</td><td>1</td><td>0</td></tr><tr><td>{20211002, 10}</td><td>1</td><td>1</td><td>0</td></tr></tbody></table><p>Note:
-For unpartitioned tables, the partitions table will contain only the record_count and file_count columns.</p><h3 id=all-metadata-tables>All Metadata Tables</h3><p>These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots.</p><div class=danger>The &ldquo;all&rdquo; metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot.</div><h4 id=all-da [...]
-</span></span></code></pre></div><table><thead><tr><th>content</th><th>file_path</th><th>file_format</th><th>partition</th><th>record_count</th><th>file_size_in_bytes</th><th>column_sizes</th><th>value_counts</th><th>null_value_counts</th><th>nan_value_counts</th><th>lower_bounds</th><th>upper_bounds</th><th>key_metadata</th><th>split_offsets</th><th>equality_ids</th><th>sort_order_id</th></tr></thead><tbody><tr><td>0</td><td>s3://&mldr;/dt=20210102/00000-0-756e2512-49ae-45bb-aae3-c0ca47 [...]
-</span></span></code></pre></div><table><thead><tr><th>path</th><th>length</th><th>partition_spec_id</th><th>added_snapshot_id</th><th>added_data_files_count</th><th>existing_data_files_count</th><th>deleted_data_files_count</th><th>partition_summaries</th></tr></thead><tbody><tr><td>s3://&mldr;/metadata/a85f78c5-3222-4b37-b7e4-faf944425d48-m0.avro</td><td>6376</td><td>0</td><td>6272782676904868561</td><td>2</td><td>0</td><td>0</td><td>[{false, false, 20210101, 20210101}]</td></tr></tbod [...]
-This usually occurs when reading from V1 table, where <code>contains_nan</code> is not populated.</li></ol><h3 id=references>References</h3><p>To show a table&rsquo;s known snapshot references:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>F [...]
-</span></span></code></pre></div><table><thead><tr><th>name</th><th>type</th><th>snapshot_id</th><th>max_reference_age_in_ms</th><th>min_snapshots_to_keep</th><th>max_snapshot_age_in_ms</th></tr></thead><tbody><tr><td>main</td><td>BRANCH</td><td>4686954189838128572</td><td>10</td><td>20</td><td>30</td></tr><tr><td>testTag</td><td>TAG</td><td>4686954189838128572</td><td>10</td><td>null</td><td>null</td></tr></tbody></table><h2 id=rewrite-files-action>Rewrite files action.</h2><p>Iceberg p [...]
+</span></span><span style=display:flex><span><span style=color:#75715e>-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> <span style=color:#75715e>/*+ [...]
 </span></span><span style=display:flex><span>
-</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>);</span>
-</span></span><span style=display:flex><span>Table table <span style=color:#f92672>=</span> tableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>loadTable</span><span style=color:#f92672>();</span>
-</span></span><span style=display:flex><span>RewriteDataFilesActionResult result <span style=color:#f92672>=</span> Actions<span style=color:#f92672>.</span><span style=color:#a6e22e>forTable</span><span style=color:#f92672>(</span>table<span style=color:#f92672>)</span>
-</span></span><span style=display:flex><span>        <span style=color:#f92672>.</span><span style=color:#a6e22e>rewriteDataFiles</span><span style=color:#f92672>()</span>
-</span></span><span style=display:flex><span>        <span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>();</span>
-</span></span></code></pre></div><p>For more doc about options of the rewrite files action, please see <a href=../../../javadoc/1.2.0/org/apache/iceberg/flink/actions/RewriteDataFilesAction.html>RewriteDataFilesAction</a></p><h2 id=type-conversion>Type conversion</h2><p>Iceberg&rsquo;s integration for Flink automatically converts between Flink and Iceberg types. When writing to a table with types that are not supported by Flink, like UUID, Iceberg will accept and convert values from the  [...]
+</span></span><span style=display:flex><span><span style=color:#75715e>-- Read all incremental data starting from the snapshot-id &#39;3821550127947089987&#39; (records from this snapshot will be excluded).
+</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> <span style=color:#75715e>/*+ [...]
+</span></span></code></pre></div><p>SQL is also the recommended way to inspect tables. To view all of the snapshots in a table, use the snapshots metadata table:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> <span style=color:#f9 [...]
+</span></span></code></pre></div><p>Iceberg support streaming or batch read in Java API:</p><pre tabindex=0><code>DataStream&lt;RowData&gt; batch = FlinkSource.forRowData()
+     .env(env)
+     .tableLoader(tableLoader)
+     .streaming(false)
+     .build();
+</code></pre><h2 id=type-conversion>Type conversion</h2><p>Iceberg&rsquo;s integration for Flink automatically converts between Flink and Iceberg types. When writing to a table with types that are not supported by Flink, like UUID, Iceberg will accept and convert values from the Flink type.</p><h3 id=flink-to-iceberg>Flink to Iceberg</h3><p>Flink types are converted to Iceberg types according to the following table:</p><table><thead><tr><th>Flink</th><th>Iceberg</th><th>Notes</th></tr></ [...]
 <script src=https://iceberg.apache.org/docs/1.2.1//js/jquery.easing.min.js></script>
 <script type=text/javascript src=https://iceberg.apache.org/docs/1.2.1//js/search.js></script>
 <script src=https://iceberg.apache.org/docs/1.2.1//js/bootstrap.min.js></script>
diff --git a/docs/1.2.1/getting-started/index.html b/docs/1.2.1/getting-started/index.html
index 09ca25ce..10d634cf 100644
--- a/docs/1.2.1/getting-started/index.html
+++ b/docs/1.2.1/getting-started/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class="collapse in"><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a id=active href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-p [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/hive/index.html b/docs/1.2.1/hive/index.html
index 74a24b75..c41f0af2 100644
--- a/docs/1.2.1/hive/index.html
+++ b/docs/1.2.1/hive/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a id=active href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li>< [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a id=active hre [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/index.html b/docs/1.2.1/index.html
index dbef8a84..6eb47835 100644
--- a/docs/1.2.1/index.html
+++ b/docs/1.2.1/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=./spark-ddl/>DDL</a></li><li><a href=./getting-started/>Getting Started</a></li><li><a href=./spark-procedures/>Procedures</a></li><li><a href=./spark-queries/>Queries</a></li><li><a href=./spark-structured-streaming/>Structured Streaming</a></li><li><a href=./spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#Flin [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=./flink/>Enabling Iceberg in Flink</a></li><li><a href=./flink-connector/>Flink Connector</a></li></ul></div><li><a href=./hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_bla [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=./flink/>Flink Getting Started</a></li><li><a href=./flink-connector/>Flink Connector</a></li><li><a href=./flink-ddl/>Flink DDL</a></li><li><a href=./flink-queries/>Flink Queries</a></li><li><a href=./flink-writes/>Flink Writes</a></li><li><a href=./flink-actions/>Flink Actions</a></li><li><a href=./flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=./hive/><span>H [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=./aws/>AWS</a></li><li><a href=./dell/>Dell</a></li><li><a href=./jdbc/>JDBC</a></li><li><a href=./nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/index.xml b/docs/1.2.1/index.xml
index 40f84b8b..0904e651 100644
--- a/docs/1.2.1/index.xml
+++ b/docs/1.2.1/index.xml
@@ -12,10 +12,20 @@ Iceberg uses Apache Spark&amp;rsquo;s DataSourceV2 API for data source and catal
 Spark 2.4 can&amp;rsquo;t create Iceberg tables with DDL, instead use Spark 3 or the Iceberg API. CREATE TABLE Spark 3 can create tables in any Iceberg catalog with the clause USING iceberg:</description></item><item><title>Dell</title><link>https://iceberg.apache.org/docs/1.2.1/dell/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>https://iceberg.apache.org/docs/1.2.1/dell/</guid><description>Iceberg Dell Integration Dell ECS Integration Iceberg can be used with Dell&amp;r [...]
 See Dell ECS for more information on Dell ECS.
 Parameters When using Dell ECS with Iceberg, these configuration parameters are required:
-Name Description ecs.s3.endpoint ECS S3 service endpoint ecs.s3.access-key-id ECS Username ecs.s3.secret-access-key S3 Secret Key warehouse The location of data and metadata The warehouse should use the following formats:</description></item><item><title>Enabling Iceberg in Flink</title><link>https://iceberg.apache.org/docs/1.2.1/flink/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>https://iceberg.apache.org/docs/1.2.1/flink/</guid><description>Flink Apache Iceberg suppor [...]
-Feature support Flink Notes SQL create catalog ✔️ SQL create database ✔️ SQL create table ✔️ SQL create table like ✔️ SQL alter table ✔️ Only support altering table properties, column and partition changes are not supported SQL drop_table ✔️ SQL select ✔️ Support both streaming and batch mode SQL insert into ✔️ ️ Support both streaming and batch mode SQL insert overwrite ✔️ ️ DataStream read ✔️ ️ DataStream append ✔️ ️ DataStream overwrite ✔️ ️ Metadata tables ️ Support Java API but does [...]
-For example, Hive table partitioning cannot change so moving from a daily partition layout to an hourly partition layout requires a new table. And because queries are dependent on partitions, queries must be rewritten for the new table.</description></item><item><title>Flink Connector</title><link>https://iceberg.apache.org/docs/1.2.1/flink-connector/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>https://iceberg.apache.org/docs/1.2.1/flink-connector/</guid><description>Fl [...]
-In Flink, the SQL CREATE TABLE test (..) WITH ('connector'='iceberg', ...) will create a Flink table in current Flink catalog (use GenericInMemoryCatalog by default), which is just mapping to the underlying iceberg table instead of maintaining iceberg table directly in current Flink catalog.</description></item><item><title>Java API</title><link>https://iceberg.apache.org/docs/1.2.1/api/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>https://iceberg.apache.org/docs/1.2.1/a [...]
+Name Description ecs.s3.endpoint ECS S3 service endpoint ecs.s3.access-key-id ECS Username ecs.s3.secret-access-key S3 Secret Key warehouse The location of data and metadata The warehouse should use the following formats:</description></item><item><title>Evolution</title><link>https://iceberg.apache.org/docs/1.2.1/evolution/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>https://iceberg.apache.org/docs/1.2.1/evolution/</guid><description>Evolution Iceberg supports in-place [...]
+For example, Hive table partitioning cannot change so moving from a daily partition layout to an hourly partition layout requires a new table. And because queries are dependent on partitions, queries must be rewritten for the new table.</description></item><item><title>Flink Actions</title><link>https://iceberg.apache.org/docs/1.2.1/flink-actions/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>https://iceberg.apache.org/docs/1.2.1/flink-actions/</guid><description>Rewrite  [...]
+import org.apache.iceberg.flink.actions.Actions; TableLoader tableLoader = TableLoader.fromHadoopTable(&amp;#34;hdfs://nn:8020/warehouse/path&amp;#34;); Table table = tableLoader.loadTable(); RewriteDataFilesActionResult result = Actions.forTable(table) .rewriteDataFiles() .execute(); For more details of the rewrite files action, please refer to RewriteDataFilesAction</description></item><item><title>Flink Configuration</title><link>https://iceberg.apache.org/docs/1.2.1/flink-configurati [...]
+CREATE CATALOG &amp;lt;catalog_name&amp;gt; WITH ( &amp;#39;type&amp;#39;=&amp;#39;iceberg&amp;#39;, `&amp;lt;config_key&amp;gt;`=`&amp;lt;config_value&amp;gt;` ); The following properties can be set globally and are not limited to a specific catalog implementation:
+Property Required Values Description type ✔️ iceberg Must be iceberg. catalog-type hive, hadoop or rest hive, hadoop or rest for built-in catalogs, or left unset for custom catalog implementations using catalog-impl.</description></item><item><title>Flink Connector</title><link>https://iceberg.apache.org/docs/1.2.1/flink-connector/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>https://iceberg.apache.org/docs/1.2.1/flink-connector/</guid><description>Flink Connector Apache [...]
+In Flink, the SQL CREATE TABLE test (..) WITH ('connector'='iceberg', ...) will create a Flink table in current Flink catalog (use GenericInMemoryCatalog by default), which is just mapping to the underlying iceberg table instead of maintaining iceberg table directly in current Flink catalog.</description></item><item><title>Flink DDL</title><link>https://iceberg.apache.org/docs/1.2.1/flink-ddl/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>https://iceberg.apache.org/docs/ [...]
+CREATE CATALOG hive_catalog WITH ( &amp;#39;type&amp;#39;=&amp;#39;iceberg&amp;#39;, &amp;#39;catalog-type&amp;#39;=&amp;#39;hive&amp;#39;, &amp;#39;uri&amp;#39;=&amp;#39;thrift://localhost:9083&amp;#39;, &amp;#39;clients&amp;#39;=&amp;#39;5&amp;#39;, &amp;#39;property-version&amp;#39;=&amp;#39;1&amp;#39;, &amp;#39;warehouse&amp;#39;=&amp;#39;hdfs://nn:8020/warehouse/path&amp;#39; ); The following properties can be set if using the Hive catalog:
+uri: The Hive metastore&amp;rsquo;s thrift URI. (Required) clients: The Hive metastore client pool size, default value is 2. (Optional) warehouse: The Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.</description></item><item><title>Flink Getting Started</title><link>https://iceberg.apache.org/docs/1.2.1/flink/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>https://iceberg.apache.org/docs [...]
+Feature support Flink Notes SQL create catalog ✔️ SQL create database ✔️ SQL create table ✔️ SQL create table like ✔️ SQL alter table ✔️ Only support altering table properties, column and partition changes are not supported SQL drop_table ✔️ SQL select ✔️ Support both streaming and batch mode SQL insert into ✔️ ️ Support both streaming and batch mode SQL insert overwrite ✔️ ️ DataStream read ✔️ ️ DataStream append ✔️ ️ DataStream overwrite ✔️ ️ Metadata tables ✔️ Rewrite files action ✔️  [...]
+Reading with SQL Iceberg support both streaming and batch read in Flink. Execute the following sql command to switch execution mode from streaming to batch, and vice versa:
+-- Execute the flink job in streaming mode for current session context SET execution.runtime-mode = streaming; -- Execute the flink job in batch mode for current session context SET execution.</description></item><item><title>Flink Writes</title><link>https://iceberg.apache.org/docs/1.2.1/flink-writes/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>https://iceberg.apache.org/docs/1.2.1/flink-writes/</guid><description>Flink Writes Iceberg support batch and streaming writes [...]
+Writing with SQL Iceberg support both INSERT INTO and INSERT OVERWRITE.
+INSERT INTO To append new data to a table with a Flink streaming job, use INSERT INTO:
+INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, &amp;#39;a&amp;#39;); INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table; INSERT OVERWRITE To replace data in the table with the result of a query, use INSERT OVERWRITE in batch job (flink streaming job does not support INSERT OVERWRITE).</description></item><item><title>Java API</title><link>https://iceberg.apache.org/docs/1.2.1/api/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>ht [...]
 Table metadata and operations are accessed through the Table interface. This interface will return table information.
 Table metadata The Table interface provides access to the table metadata:
 schema returns the current table schema spec returns the current table partition spec properties returns a map of key-value properties currentSnapshot returns the current table snapshot snapshots returns all valid snapshots for the table snapshot(id) returns a specific snapshot by ID location returns the table&amp;rsquo;s base location Tables also provide refresh to update the table to the latest version, and expose helpers:</description></item><item><title>Java Custom Catalog</title><li [...]
diff --git a/docs/1.2.1/java-api-quickstart/index.html b/docs/1.2.1/java-api-quickstart/index.html
index 11cbc226..c12e5c5c 100644
--- a/docs/1.2.1/java-api-quickstart/index.html
+++ b/docs/1.2.1/java-api-quickstart/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class=chevron-toggle data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/jdbc/index.html b/docs/1.2.1/jdbc/index.html
index ad8b1403..9b07af03 100644
--- a/docs/1.2.1/jdbc/index.html
+++ b/docs/1.2.1/jdbc/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class="collapse in"><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a id=active href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/maintenance/index.html b/docs/1.2.1/maintenance/index.html
index c07185c1..67248aed 100644
--- a/docs/1.2.1/maintenance/index.html
+++ b/docs/1.2.1/maintenance/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/nessie/index.html b/docs/1.2.1/nessie/index.html
index 0574aa43..096c2e0e 100644
--- a/docs/1.2.1/nessie/index.html
+++ b/docs/1.2.1/nessie/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class="collapse in"><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a id=active href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/partitioning/index.html b/docs/1.2.1/partitioning/index.html
index d11339eb..db910562 100644
--- a/docs/1.2.1/partitioning/index.html
+++ b/docs/1.2.1/partitioning/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/performance/index.html b/docs/1.2.1/performance/index.html
index db2740f1..64ee8013 100644
--- a/docs/1.2.1/performance/index.html
+++ b/docs/1.2.1/performance/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/reliability/index.html b/docs/1.2.1/reliability/index.html
index aae51ebb..e3a27242 100644
--- a/docs/1.2.1/reliability/index.html
+++ b/docs/1.2.1/reliability/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/schemas/index.html b/docs/1.2.1/schemas/index.html
index 9200290c..ad1f0e48 100644
--- a/docs/1.2.1/schemas/index.html
+++ b/docs/1.2.1/schemas/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/sitemap.xml b/docs/1.2.1/sitemap.xml
index 1b8a135e..39ee65fc 100644
--- a/docs/1.2.1/sitemap.xml
+++ b/docs/1.2.1/sitemap.xml
@@ -1 +1 @@
-<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9" xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>https://iceberg.apache.org/docs/1.2.1/getting-started/</loc></url><url><loc>https://iceberg.apache.org/docs/1.2.1/hive/</loc></url><url><loc>https://iceberg.apache.org/docs/1.2.1/aws/</loc></url><url><loc>https://iceberg.apache.org/docs/1.2.1/categories/</loc></url><url><loc>https://iceberg.apache.org/docs/1.2.1/configurat [...]
\ No newline at end of file
+<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9" xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>https://iceberg.apache.org/docs/1.2.1/getting-started/</loc></url><url><loc>https://iceberg.apache.org/docs/1.2.1/hive/</loc></url><url><loc>https://iceberg.apache.org/docs/1.2.1/aws/</loc></url><url><loc>https://iceberg.apache.org/docs/1.2.1/categories/</loc></url><url><loc>https://iceberg.apache.org/docs/1.2.1/configurat [...]
\ No newline at end of file
diff --git a/docs/1.2.1/spark-configuration/index.html b/docs/1.2.1/spark-configuration/index.html
index db24700b..718e93c2 100644
--- a/docs/1.2.1/spark-configuration/index.html
+++ b/docs/1.2.1/spark-configuration/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
@@ -20,7 +20,7 @@
 </span></span></code></pre></div><p>Iceberg also supports a directory-based catalog in HDFS that can be configured using <code>type=hadoop</code>:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-plain data-lang=plain><span style=display:flex><span>spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
 </span></span><span style=display:flex><span>spark.sql.catalog.hadoop_prod.type = hadoop
 </span></span><span style=display:flex><span>spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path
-</span></span></code></pre></div><div class=info>The Hive-based catalog only loads Iceberg tables. To load non-Iceberg tables in the same Hive metastore, use a <a href=#replacing-the-session-catalog>session catalog</a>.</div><h3 id=catalog-configuration>Catalog configuration</h3><p>A catalog is created and named by adding a property <code>spark.sql.catalog.(catalog-name)</code> with an implementation class for its value.</p><p>Iceberg supplies two implementations:</p><ul><li><code>org.ap [...]
+</span></span></code></pre></div><div class=info>The Hive-based catalog only loads Iceberg tables. To load non-Iceberg tables in the same Hive metastore, use a <a href=#replacing-the-session-catalog>session catalog</a>.</div><h3 id=catalog-configuration>Catalog configuration</h3><p>A catalog is created and named by adding a property <code>spark.sql.catalog.(catalog-name)</code> with an implementation class for its value.</p><p>Iceberg supplies two implementations:</p><ul><li><code>org.ap [...]
 </span></span></span></code></pre></div><p>Spark 3 keeps track of the current catalog and namespace, which can be omitted from table names.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span>USE hive_prod.db;
 </span></span><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> <span style=color:#66d9ef>table</span> <span style=color:#75715e>-- load db.table from catalog hive_prod
 </span></span></span></code></pre></div><p>To see the current catalog and namespace, run <code>SHOW CURRENT NAMESPACE</code>.</p><h3 id=replacing-the-session-catalog>Replacing the session catalog</h3><p>To add Iceberg table support to Spark&rsquo;s built-in catalog, configure <code>spark_catalog</code> to use Iceberg&rsquo;s <code>SparkSessionCatalog</code>.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code [...]
diff --git a/docs/1.2.1/spark-ddl/index.html b/docs/1.2.1/spark-ddl/index.html
index 08d92a4b..5bfe1312 100644
--- a/docs/1.2.1/spark-ddl/index.html
+++ b/docs/1.2.1/spark-ddl/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class="collapse in"><ul class=sub-menu><li><a id=active href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-p [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/spark-procedures/index.html b/docs/1.2.1/spark-procedures/index.html
index 67f4cb16..b7028fc1 100644
--- a/docs/1.2.1/spark-procedures/index.html
+++ b/docs/1.2.1/spark-procedures/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class="collapse in"><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a id=active href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-p [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
@@ -79,7 +79,34 @@ Only use this procedure when the table is no longer registered in an existing ca
 D as an argument would return A-> B -> C -> D</p></blockquote><h4 id=output-12>Output</h4><table><thead><tr><th>Output Name</th><th>Type</th><th>Description</th></tr></thead><tbody><tr><td><code>snapshot_id</code></td><td>long</td><td>the ancestor snapshot id</td></tr><tr><td><code>timestamp</code></td><td>long</td><td>snapshot creation time</td></tr></tbody></table><h4 id=examples-9>Examples</h4><p>Get all the snapshot ancestors of current snapshots(default)</p><div class=highlight><pre [...]
 </span></span></code></pre></div><p>Get all the snapshot ancestors by a particular snapshot</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>CALL</span> spark_catalog.<span style=color:#66d9ef>system</span>.ancestors_of(<span style=color:#e6db74>&#39;db.tbl&#39;</span>, <span style=color:#ae81ff>1</span>)
 </span></span><span style=display:flex><span><span style=color:#66d9ef>CALL</span> spark_catalog.<span style=color:#66d9ef>system</span>.ancestors_of(snapshot_id <span style=color:#f92672>=&gt;</span> <span style=color:#ae81ff>1</span>, <span style=color:#66d9ef>table</span> <span style=color:#f92672>=&gt;</span> <span style=color:#e6db74>&#39;db.tbl&#39;</span>)
-</span></span></code></pre></div></div><div id=toc class=markdown-body><div id=full><nav id=TableOfContents><ul><li><a href=#usage>Usage</a><ul><li><a href=#named-arguments>Named arguments</a></li><li><a href=#positional-arguments>Positional arguments</a></li></ul></li><li><a href=#snapshot-management>Snapshot management</a><ul><li><a href=#rollback_to_snapshot><code>rollback_to_snapshot</code></a></li><li><a href=#rollback_to_timestamp><code>rollback_to_timestamp</code></a></li><li><a h [...]
+</span></span></code></pre></div><h2 id=change-data-capture>Change Data Capture</h2><h3 id=create_changelog_view><code>create_changelog_view</code></h3><p>Creates a view that contains the changes from a given table.</p><h4 id=usage-14>Usage</h4><table><thead><tr><th>Argument Name</th><th>Required?</th><th>Type</th><th>Description</th></tr></thead><tbody><tr><td><code>table</code></td><td>✔️</td><td>string</td><td>Name of the source table for the changelog</td></tr><tr><td><code>changelog [...]
+</span></span><span style=display:flex><span>  <span style=color:#66d9ef>table</span> <span style=color:#f92672>=&gt;</span> <span style=color:#e6db74>&#39;db.tbl&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#66d9ef>options</span> <span style=color:#f92672>=&gt;</span> <span style=color:#66d9ef>map</span>(<span style=color:#e6db74>&#39;start-snapshot-id&#39;</span>,<span style=color:#e6db74>&#39;1&#39;</span>,<span style=color:#e6db74>&#39;end-snapshot-id&#39;</span>, <span style=color:#e6db74>&#39;2&#39;</span>)
+</span></span><span style=display:flex><span>)
+</span></span></code></pre></div><p>Create a changelog view <code>my_changelog_view</code> based on the changes that happened between timestamp <code>1678335750489</code> (exclusive) and <code>1678992105265</code> (inclusive).</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>CALL</span> spark_catalog.<span style=color [...]
+</span></span><span style=display:flex><span>  <span style=color:#66d9ef>table</span> <span style=color:#f92672>=&gt;</span> <span style=color:#e6db74>&#39;db.tbl&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#66d9ef>options</span> <span style=color:#f92672>=&gt;</span> <span style=color:#66d9ef>map</span>(<span style=color:#e6db74>&#39;start-timestamp&#39;</span>,<span style=color:#e6db74>&#39;1678335750489&#39;</span>,<span style=color:#e6db74>&#39;end-timestamp&#39;</span>, <span style=color:#e6db74>&#39;1678992105265&#39;</span>),
+</span></span><span style=display:flex><span>  changelog_view <span style=color:#f92672>=&gt;</span> <span style=color:#e6db74>&#39;my_changelog_view&#39;</span>
+</span></span><span style=display:flex><span>)
+</span></span></code></pre></div><p>Create a changelog view that computes updates based on the identifier columns <code>id</code> and <code>name</code>.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>CALL</span> spark_catalog.<span style=color:#66d9ef>system</span>.create_changelog_view(
+</span></span><span style=display:flex><span>  <span style=color:#66d9ef>table</span> <span style=color:#f92672>=&gt;</span> <span style=color:#e6db74>&#39;db.tbl&#39;</span>,
+</span></span><span style=display:flex><span>  <span style=color:#66d9ef>options</span> <span style=color:#f92672>=&gt;</span> <span style=color:#66d9ef>map</span>(<span style=color:#e6db74>&#39;start-snapshot-id&#39;</span>,<span style=color:#e6db74>&#39;1&#39;</span>,<span style=color:#e6db74>&#39;end-snapshot-id&#39;</span>, <span style=color:#e6db74>&#39;2&#39;</span>),
+</span></span><span style=display:flex><span>  identifier_columns <span style=color:#f92672>=&gt;</span> array(<span style=color:#e6db74>&#39;id&#39;</span>, <span style=color:#e6db74>&#39;name&#39;</span>)
+</span></span><span style=display:flex><span>)
+</span></span></code></pre></div><p>Once the changelog view is created, you can query the view to see the changes that happened between the snapshots.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> tbl_changes
+</span></span></code></pre></div><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> tbl_changes <span style=color:#66d9ef>where</span> _change_type <span style=color:#f92672>=</span> <span style=color:#e6db74>&#39;INSERT&#39;</span> <spa [...]
+</span></span></code></pre></div><p>Please note that the changelog view includes Change Data Capture(CDC) metadata columns
+that provide additional information about the changes being tracked. These columns are:</p><ul><li><code>_change_type</code>: the type of change. It has one of the following values: <code>INSERT</code>, <code>DELETE</code>, <code>UPDATE_BEFORE</code>, or <code>UPDATE_AFTER</code>.</li><li><code>_change_ordinal</code>: the order of changes</li><li><code>_commit_snapshot_id</code>: the snapshot ID where the change occurred</li></ul><p>Here is an example of corresponding results. It shows t [...]
+second snapshot deleted 1 record.</p><table><thead><tr><th>id</th><th>name</th><th>_change_type</th><th>_change_ordinal</th><th>_change_snapshot_id</th></tr></thead><tbody><tr><td>1</td><td>Alice</td><td>INSERT</td><td>0</td><td>5390529835796506035</td></tr><tr><td>2</td><td>Bob</td><td>INSERT</td><td>0</td><td>5390529835796506035</td></tr><tr><td>1</td><td>Alice</td><td>DELETE</td><td>1</td><td>8764748981452218370</td></tr></tbody></table><h4 id=carry-over-rows>Carry-over Rows</h4><p>Th [...]
+when using copy-on-write. For example, given a file which contains row1 <code>(id=1, name='Alice')</code> and row2 <code>(id=2, name='Bob')</code>.
+A copy-on-write delete of row2 would require erasing this file and preserving row1 in a new file. The changelog table
+reports this as the following pair of rows, despite it not being an actual change to the table.</p><table><thead><tr><th>id</th><th>name</th><th>_change_type</th></tr></thead><tbody><tr><td>1</td><td>Alice</td><td>DELETE</td></tr><tr><td>1</td><td>Alice</td><td>INSERT</td></tr></tbody></table><p>By default, this view finds the carry-over rows and removes them from the result. User can disable this
+behavior by setting the <code>remove_carryovers</code> option to <code>false</code>.</p><h4 id=prepost-update-images>Pre/Post Update Images</h4><p>The procedure computes the pre/post update images if configured. Pre/post update images are converted from a
+pair of a delete row and an insert row. Identifier columns are used for determining whether an insert and a delete record
+refer to the same row. If the two records share the same values for the identity columns they are considered to be before
+and after states of the same row. You can either set identifier fields in the table schema or input them as the procedure parameters.</p><p>The following example shows pre/post update images computation with an identifier column(<code>id</code>), where a row deletion
+and an insertion with the same <code>id</code> are treated as a single update operation. Specifically, suppose we have the following pair of rows:</p><table><thead><tr><th>id</th><th>name</th><th>_change_type</th></tr></thead><tbody><tr><td>3</td><td>Robert</td><td>DELETE</td></tr><tr><td>3</td><td>Dan</td><td>INSERT</td></tr></tbody></table><p>In this case, the procedure marks the row before the update as an <code>UPDATE_BEFORE</code> image and the row after the update
+as an <code>UPDATE_AFTER</code> image, resulting in the following pre/post update images:</p><table><thead><tr><th>id</th><th>name</th><th>_change_type</th></tr></thead><tbody><tr><td>3</td><td>Robert</td><td>UPDATE_BEFORE</td></tr><tr><td>3</td><td>Dan</td><td>UPDATE_AFTER</td></tr></tbody></table></div><div id=toc class=markdown-body><div id=full><nav id=TableOfContents><ul><li><a href=#usage>Usage</a><ul><li><a href=#named-arguments>Named arguments</a></li><li><a href=#positional-argu [...]
 <script src=https://iceberg.apache.org/docs/1.2.1//js/jquery.easing.min.js></script>
 <script type=text/javascript src=https://iceberg.apache.org/docs/1.2.1//js/search.js></script>
 <script src=https://iceberg.apache.org/docs/1.2.1//js/bootstrap.min.js></script>
diff --git a/docs/1.2.1/spark-queries/index.html b/docs/1.2.1/spark-queries/index.html
index b72e109a..19773f85 100644
--- a/docs/1.2.1/spark-queries/index.html
+++ b/docs/1.2.1/spark-queries/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class="collapse in"><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a id=active href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-p [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/spark-structured-streaming/index.html b/docs/1.2.1/spark-structured-streaming/index.html
index d3f5db64..1bc933d6 100644
--- a/docs/1.2.1/spark-structured-streaming/index.html
+++ b/docs/1.2.1/spark-structured-streaming/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class="collapse in"><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a id=active href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-p [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>
diff --git a/docs/1.2.1/spark-writes/index.html b/docs/1.2.1/spark-writes/index.html
index 8868b655..988596b6 100644
--- a/docs/1.2.1/spark-writes/index.html
+++ b/docs/1.2.1/spark-writes/index.html
@@ -9,7 +9,7 @@
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Spark class="collapse in"><ul class=sub-menu><li><a href=../spark-ddl/>DDL</a></li><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a id=active href=../spark-writes/>Writes</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-p [...]
 <i class="fa fa-chevron-right"></i>
-<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Enabling Iceberg in Flink</a></li><li><a href=../flink-connector/>Flink Connector</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_ [...]
+<i class="fa fa-chevron-down"></i></a></li><div id=Flink class=collapse><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/ [...]
 <i class="fa fa-chevron-right"></i>
 <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
 <i class="fa fa-chevron-right"></i>