You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "hililiwei (via GitHub)" <gi...@apache.org> on 2023/03/14 06:37:10 UTC

[GitHub] [iceberg] hililiwei opened a new pull request, #7099: Doc: Typeset Flink Doc

hililiwei opened a new pull request, #7099:
URL: https://github.com/apache/iceberg/pull/7099

   Retypeset the Flink document
   
   
   ![image](https://user-images.githubusercontent.com/59213263/224915901-50094134-61a8-46d8-9f4f-5dd54750c6cb.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 911432 commented on a diff in pull request #7099: Doc: Retypeset the Flink document

Posted by "911432 (via GitHub)" <gi...@apache.org>.
911432 commented on code in PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#discussion_r1148552976


##########
docs/flink-getting-started.md:
##########
@@ -108,7 +110,7 @@ wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_V
 ## Flink's Python API
 
 {{< hint info >}}
-PyFlink 1.6.1 [does not work on OSX with a M1 cpu](https://issues.apache.org/jira/browse/FLINK-28786) 
+PyFlink 1.6.1 [does not work on OSX with a M1 cpu](https://issues.apache.org/jira/browse/FLINK-28786)

Review Comment:
   I think the version is 1.16.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 911432 commented on pull request #7099: Doc: Retypeset the Flink document

Posted by "911432 (via GitHub)" <gi...@apache.org>.
911432 commented on PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#issuecomment-1484107149

   I have done rebase. #7212 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 911432 commented on pull request #7099: Doc: Retypeset the Flink document

Posted by "911432 (via GitHub)" <gi...@apache.org>.
911432 commented on PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#issuecomment-1484114066

   I have done rebase. https://github.com/apache/iceberg/pull/7212 https://github.com/apache/iceberg/pull/7213


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7099: Doc: Retypeset the Flink document

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#discussion_r1136483829


##########
docs/flink-ddl.md:
##########
@@ -0,0 +1,234 @@
+---
+title: "Flink DDL"
+url: flink-ddl
+aliases:
+    - "flink/flink-ddl"
+menu:
+    main:
+        parent: Flink
+        weight: 200
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## DDL commands
+
+###  `Create Catalog`
+
+#### Catalog Configuration

Review Comment:
   Added configuration page.
   ![image](https://user-images.githubusercontent.com/59213263/225194756-a6bfafda-338e-459a-8b5c-73a3d45ce663.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 911432 commented on a diff in pull request #7099: Doc: Retypeset the Flink document

Posted by "911432 (via GitHub)" <gi...@apache.org>.
911432 commented on code in PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#discussion_r1148549357


##########
docs/flink-getting-started.md:
##########
@@ -219,103 +221,9 @@ The following properties can be set if using the Hive catalog:
 * `clients`: The Hive metastore client pool size, default value is 2. (Optional)
 * `warehouse`: The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath.
 * `hive-conf-dir`: Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwritten with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog.
-* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values. 
+* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values.

Review Comment:
   Because I don't know if I can delete it or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 911432 commented on a diff in pull request #7099: Doc: Retypeset the Flink document

Posted by "911432 (via GitHub)" <gi...@apache.org>.
911432 commented on code in PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#discussion_r1148549357


##########
docs/flink-getting-started.md:
##########
@@ -219,103 +221,9 @@ The following properties can be set if using the Hive catalog:
 * `clients`: The Hive metastore client pool size, default value is 2. (Optional)
 * `warehouse`: The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath.
 * `hive-conf-dir`: Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwritten with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog.
-* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values. 
+* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values.

Review Comment:
   Because I don't know if I can delete it or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 911432 commented on a diff in pull request #7099: Doc: Retypeset the Flink document

Posted by "911432 (via GitHub)" <gi...@apache.org>.
911432 commented on code in PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#discussion_r1148552976


##########
docs/flink-getting-started.md:
##########
@@ -108,7 +110,7 @@ wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_V
 ## Flink's Python API
 
 {{< hint info >}}
-PyFlink 1.6.1 [does not work on OSX with a M1 cpu](https://issues.apache.org/jira/browse/FLINK-28786) 
+PyFlink 1.6.1 [does not work on OSX with a M1 cpu](https://issues.apache.org/jira/browse/FLINK-28786)

Review Comment:
   I think the version is 1.16.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #7099: Doc: Retypeset the Flink document

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#discussion_r1153976861


##########
docs/flink-getting-started.md:
##########
@@ -219,103 +221,9 @@ The following properties can be set if using the Hive catalog:
 * `clients`: The Hive metastore client pool size, default value is 2. (Optional)
 * `warehouse`: The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath.
 * `hive-conf-dir`: Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwritten with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog.
-* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values. 
+* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values.

Review Comment:
   My intention was to keep a sample catalog so that readers could quickly use it to create tables. If hive-catalog is removed, readers may be confused about how to create a working catalog.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 911432 commented on pull request #7099: Doc: Retypeset the Flink document

Posted by "911432 (via GitHub)" <gi...@apache.org>.
911432 commented on PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#issuecomment-1491164549

   I have done rebase about https://github.com/apache/iceberg/pull/7245 and https://github.com/apache/iceberg/pull/7246


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko merged pull request #7099: Doc: Retypeset the Flink document

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko merged PR #7099:
URL: https://github.com/apache/iceberg/pull/7099


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 911432 commented on pull request #7099: Doc: Retypeset the Flink document

Posted by "911432 (via GitHub)" <gi...@apache.org>.
911432 commented on PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#issuecomment-1484121692

   I have done rebase about https://github.com/apache/iceberg/pull/7214 and https://github.com/apache/iceberg/pull/7215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7099: Doc: Retypeset the Flink document

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#discussion_r1135184392


##########
docs/flink-ddl.md:
##########
@@ -0,0 +1,234 @@
+---
+title: "Flink DDL"
+url: flink-ddl
+aliases:
+    - "flink/flink-ddl"
+menu:
+    main:
+        parent: Flink
+        weight: 200
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## DDL commands

Review Comment:
   For the other docs, we tend to start with a single hash instead of two.



##########
docs/flink-actions.md:
##########
@@ -0,0 +1,42 @@
+---
+title: "Flink Actions"
+url: flink-actions
+aliases:
+    - "flink/flink-actions"
+menu:
+    main:
+        parent: Flink
+        weight: 500
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Rewrite files action.
+
+Iceberg provides API to rewrite small files into large files by submitting flink batch job. The behavior of this flink action is the same as the spark's [rewriteDataFiles](../maintenance/#compact-data-files).
+
+```java
+import org.apache.iceberg.flink.actions.Actions;
+
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
+Table table = tableLoader.loadTable();
+RewriteDataFilesActionResult result = Actions.forTable(table)
+        .rewriteDataFiles()
+        .execute();
+```
+
+For more doc about options of the rewrite files action, please see [RewriteDataFilesAction](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/flink/actions/RewriteDataFilesAction.html)

Review Comment:
   ```suggestion
   For more details of the rewrite files action, please refer to [RewriteDataFilesAction](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/flink/actions/RewriteDataFilesAction.html)
   ```



##########
docs/flink-writes.md:
##########
@@ -0,0 +1,269 @@
+---
+title: "Flink Writes"
+url: flink-writes
+aliases:
+    - "flink/flink-writes"
+menu:
+    main:
+        parent: Flink
+        weight: 400
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Writing with SQL
+
+Iceberg support both `INSERT INTO` and `INSERT OVERWRITE`.
+
+### `INSERT INTO`
+
+To append new data to a table with a Flink streaming job, use `INSERT INTO`:
+
+```sql
+INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
+INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table;
+```
+
+### `INSERT OVERWRITE`
+
+To replace data in the table with the result of a query, use `INSERT OVERWRITE` in batch job (flink streaming job does not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables.
+
+Partitions that have rows produced by the SELECT query will be replaced, for example:
+
+```sql
+INSERT OVERWRITE sample VALUES (1, 'a');
+```
+
+Iceberg also support overwriting given partitions by the `select` values:
+
+```sql
+INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
+```
+
+For a partitioned iceberg table, when all the partition columns are set a value in `PARTITION` clause, it is inserting into a static partition, otherwise if partial partition columns (prefix part of all partition columns) are set a value in `PARTITION` clause, it is writing the query result into a dynamic partition.
+For an unpartitioned iceberg table, its data will be completely overwritten by `INSERT OVERWRITE`.
+
+### `UPSERT`
+
+Iceberg supports `UPSERT` based on the primary key when writing data into v2 table format. There are two ways to enable upsert.
+
+1. Enable the `UPSERT` mode as table-level property `write.upsert.enabled`. Here is an example SQL statement to set the table property when creating a table. It would be applied for all write paths to this table (batch or streaming) unless overwritten by write options as described later.
+
+```sql
+CREATE TABLE `hive_catalog`.`default`.`sample` (
+  `id`  INT UNIQUE COMMENT 'unique id',
+  `data` STRING NOT NULL,
+ PRIMARY KEY(`id`) NOT ENFORCED
+) with ('format-version'='2', 'write.upsert.enabled'='true');
+```
+
+2. Enabling `UPSERT` mode using `upsert-enabled` in the [write options](#Write-options) provides more flexibility than a table level config. Note that you still need to use v2 table format and specify the primary key when creating the table.
+
+```sql
+INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
+...
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+
+
+## Writing with DataStream
+
+Iceberg support writing to iceberg table from different DataStream input.
+
+
+### Appending data.
+
+Flink supports writing `DataStream<RowData>` and `DataStream<Row>` to the sink iceberg table natively.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+The iceberg API also allows users to write generic `DataStream<T>` to iceberg table, more example could be found in this [unit test](https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java).
+
+### Overwrite data
+
+Set the `overwrite` flag in FlinkSink builder to overwrite the data in existing iceberg tables:
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .overwrite(true)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+### Upsert data
+
+Set the `upsert` flag in FlinkSink builder to upsert the data in existing iceberg table. The table must use v2 table format and have a primary key.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .upsert(true)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+### Write with Avro GenericRecord
+
+Flink Iceberg sink provides `AvroGenericRecordToRowDataMapper` that converts
+Avro `GenericRecord` to Flink `RowData`. You can use the mapper to write
+Avro GenericRecord DataStream to Iceberg.
+
+Please make sure `flink-avro` jar is included in the classpath.
+Also `iceberg-flink-runtime` shaded bundle jar can't be used
+because the runtime jar shades the avro package.
+Please use non-shaded `iceberg-flink` jar instead.
+
+```java
+DataStream<org.apache.avro.generic.GenericRecord> dataStream = ...;
+
+Schema icebergSchema = table.schema();
+
+
+// The Avro schema converted from Iceberg schema can't be used
+// due to precision difference between how Iceberg schema (micro)
+// and Flink AvroToRowDataConverters (milli) deal with time type.
+// Instead, use the Avro schema defined directly.
+// See AvroGenericRecordToRowDataMapper Javadoc for more details.
+org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, table.name());
+
+GenericRecordAvroTypeInfo avroTypeInfo = new GenericRecordAvroTypeInfo(avroSchema);
+RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+
+FlinkSink.builderFor(
+    dataStream,
+    AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema),
+    FlinkCompatibilityUtil.toTypeInfo(rowType))
+  .table(table)
+  .tableLoader(tableLoader)
+  .append();
+```
+
+### Netrics

Review Comment:
   ```suggestion
   ### Metrics
   ```



##########
docs/flink-writes.md:
##########
@@ -0,0 +1,269 @@
+---
+title: "Flink Writes"
+url: flink-writes
+aliases:
+    - "flink/flink-writes"
+menu:
+    main:
+        parent: Flink
+        weight: 400
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Writing with SQL
+
+Iceberg support both `INSERT INTO` and `INSERT OVERWRITE`.
+
+### `INSERT INTO`
+
+To append new data to a table with a Flink streaming job, use `INSERT INTO`:
+
+```sql
+INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
+INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table;
+```
+
+### `INSERT OVERWRITE`
+
+To replace data in the table with the result of a query, use `INSERT OVERWRITE` in batch job (flink streaming job does not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables.
+
+Partitions that have rows produced by the SELECT query will be replaced, for example:
+
+```sql
+INSERT OVERWRITE sample VALUES (1, 'a');
+```
+
+Iceberg also support overwriting given partitions by the `select` values:
+
+```sql
+INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
+```
+
+For a partitioned iceberg table, when all the partition columns are set a value in `PARTITION` clause, it is inserting into a static partition, otherwise if partial partition columns (prefix part of all partition columns) are set a value in `PARTITION` clause, it is writing the query result into a dynamic partition.
+For an unpartitioned iceberg table, its data will be completely overwritten by `INSERT OVERWRITE`.
+
+### `UPSERT`
+
+Iceberg supports `UPSERT` based on the primary key when writing data into v2 table format. There are two ways to enable upsert.
+
+1. Enable the `UPSERT` mode as table-level property `write.upsert.enabled`. Here is an example SQL statement to set the table property when creating a table. It would be applied for all write paths to this table (batch or streaming) unless overwritten by write options as described later.
+
+```sql
+CREATE TABLE `hive_catalog`.`default`.`sample` (
+  `id`  INT UNIQUE COMMENT 'unique id',
+  `data` STRING NOT NULL,
+ PRIMARY KEY(`id`) NOT ENFORCED
+) with ('format-version'='2', 'write.upsert.enabled'='true');
+```
+
+2. Enabling `UPSERT` mode using `upsert-enabled` in the [write options](#Write-options) provides more flexibility than a table level config. Note that you still need to use v2 table format and specify the primary key when creating the table.
+
+```sql
+INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
+...
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+
+
+## Writing with DataStream
+
+Iceberg support writing to iceberg table from different DataStream input.
+
+
+### Appending data.
+
+Flink supports writing `DataStream<RowData>` and `DataStream<Row>` to the sink iceberg table natively.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+The iceberg API also allows users to write generic `DataStream<T>` to iceberg table, more example could be found in this [unit test](https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java).
+
+### Overwrite data
+
+Set the `overwrite` flag in FlinkSink builder to overwrite the data in existing iceberg tables:
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .overwrite(true)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+### Upsert data
+
+Set the `upsert` flag in FlinkSink builder to upsert the data in existing iceberg table. The table must use v2 table format and have a primary key.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .upsert(true)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+### Write with Avro GenericRecord
+
+Flink Iceberg sink provides `AvroGenericRecordToRowDataMapper` that converts
+Avro `GenericRecord` to Flink `RowData`. You can use the mapper to write
+Avro GenericRecord DataStream to Iceberg.
+
+Please make sure `flink-avro` jar is included in the classpath.
+Also `iceberg-flink-runtime` shaded bundle jar can't be used
+because the runtime jar shades the avro package.
+Please use non-shaded `iceberg-flink` jar instead.
+
+```java
+DataStream<org.apache.avro.generic.GenericRecord> dataStream = ...;
+
+Schema icebergSchema = table.schema();
+
+
+// The Avro schema converted from Iceberg schema can't be used
+// due to precision difference between how Iceberg schema (micro)
+// and Flink AvroToRowDataConverters (milli) deal with time type.
+// Instead, use the Avro schema defined directly.
+// See AvroGenericRecordToRowDataMapper Javadoc for more details.
+org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, table.name());
+
+GenericRecordAvroTypeInfo avroTypeInfo = new GenericRecordAvroTypeInfo(avroSchema);
+RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+
+FlinkSink.builderFor(
+    dataStream,
+    AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema),
+    FlinkCompatibilityUtil.toTypeInfo(rowType))
+  .table(table)
+  .tableLoader(tableLoader)
+  .append();
+```
+
+### Netrics
+
+The following Flink metrics are provided by the Flink Iceberg sink.
+
+Parallel writer metrics are added under the sub group of `IcebergStreamWriter`.
+They should have the following key-value tags.
+
+* table: full table name (like iceberg.my_db.my_table)
+* subtask_index: writer subtask index starting from 0
+
+ Metric name                | Metric type | Description                                                                                         |
+| ------------------------- |------------|-----------------------------------------------------------------------------------------------------|
+| lastFlushDurationMs       | Gague      | The duration (in milli) that writer subtasks take to flush and upload the files during checkpoint.  |
+| flushedDataFiles          | Counter    | Number of data files flushed and uploaded.                                                          |
+| flushedDeleteFiles        | Counter    | Number of delete files flushed and uploaded.                                                        |
+| flushedReferencedDataFiles| Counter    | Number of data files referenced by the flushed delete files.                                        |
+| dataFilesSizeHistogram    | Histogram  | Histogram distribution of data file sizes (in bytes).                                               |
+| deleteFilesSizeHistogram  | Histogram  | Histogram distribution of delete file sizes (in bytes).                                             |
+
+Committer metrics are added under the sub group of `IcebergFilesCommitter`.
+They should have the following key-value tags.
+
+* table: full table name (like iceberg.my_db.my_table)
+
+ Metric name                      | Metric type | Description                                                                |
+|---------------------------------|--------|----------------------------------------------------------------------------|
+| lastCheckpointDurationMs        | Gague  | The duration (in milli) that the committer operator checkpoints its state. |
+| lastCommitDurationMs            | Gague  | The duration (in milli) that the Iceberg table commit takes.               |
+| committedDataFilesCount         | Counter | Number of data files committed.                                            |
+| committedDataFilesRecordCount   | Counter | Number of records contained in the committed data files.                   |
+| committedDataFilesByteCount     | Counter | Number of bytes contained in the committed data files.                     |
+| committedDeleteFilesCount       | Counter | Number of delete files committed.                                          |
+| committedDeleteFilesRecordCount | Counter | Number of records contained in the committed delete files.                 |
+| committedDeleteFilesByteCount   | Counter | Number of bytes contained in the committed delete files.                   |
+| elapsedSecondsSinceLastSuccessfulCommit| Gague  | Elapsed time (in seconds) since last successful Iceberg commit.            |
+
+`elapsedSecondsSinceLastSuccessfulCommit` is an ideal alerting metric
+to detect failed or missing Iceberg commits.
+
+* Iceberg commit happened after successful Flink checkpoint in the `notifyCheckpointComplete` callback.
+  It could happen that Iceberg commits failed (for whatever reason), while Flink checkpoints succeeding.
+* It could also happen that `notifyCheckpointComplete` wasn't triggered (for whatever bug).
+  As a result, there won't be any Iceberg commits attempted.
+
+If the checkpoint interval (and expected Iceberg commit interval) is 5 minutes, set up alert with rule like `elapsedSecondsSinceLastSuccessfulCommit > 60 minutes` to detect failed or missing Iceberg commits in the past hour.
+
+
+
+## Options
+
+### Write options
+
+Flink write options are passed when configuring the FlinkSink, like this:
+
+```

Review Comment:
   ```suggestion
   ```java
   ```



##########
docs/flink-queries.md:
##########
@@ -0,0 +1,468 @@
+---
+title: "Flink Queries"
+url: flink-queries
+aliases:
+   - "flink/flink-queries"
+menu:
+   main:
+      parent: Flink
+      weight: 300
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Reading with SQL
+
+Iceberg support both streaming and batch read in Flink. Execute the following sql command to switch execution mode from `streaming` to `batch`, and vice versa:
+
+```sql
+-- Execute the flink job in streaming mode for current session context
+SET execution.runtime-mode = streaming;
+
+-- Execute the flink job in batch mode for current session context
+SET execution.runtime-mode = batch;
+```
+
+### Flink batch read
+
+Submit a Flink __batch__ job using the following sentences:
+
+```sql
+-- Execute the flink job in batch mode for current session context
+SET execution.runtime-mode = batch;
+SELECT * FROM sample;
+```
+
+### Flink streaming read
+
+Iceberg supports processing incremental data in flink streaming jobs which starts from a historical snapshot-id:

Review Comment:
   ```suggestion
   Iceberg supports processing incremental data in Flink streaming jobs which starts from a historical snapshot-id:
   ```



##########
docs/flink-ddl.md:
##########
@@ -0,0 +1,234 @@
+---
+title: "Flink DDL"
+url: flink-ddl
+aliases:
+    - "flink/flink-ddl"
+menu:
+    main:
+        parent: Flink
+        weight: 200
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## DDL commands
+
+###  `Create Catalog`
+
+#### Catalog Configuration

Review Comment:
   Should we split this one out into `flink-configuration.md`? If you're looking for DDL, you'll first encounter a long list of catalog configuration options.



##########
docs/flink-actions.md:
##########
@@ -0,0 +1,42 @@
+---
+title: "Flink Actions"
+url: flink-actions
+aliases:
+    - "flink/flink-actions"
+menu:
+    main:
+        parent: Flink
+        weight: 500
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Rewrite files action.
+
+Iceberg provides API to rewrite small files into large files by submitting flink batch job. The behavior of this flink action is the same as the spark's [rewriteDataFiles](../maintenance/#compact-data-files).

Review Comment:
   ```suggestion
   Iceberg provides API to rewrite small files into large files by submitting Flink batch jobs. The behavior of this Flink action is the same as Spark's [rewriteDataFiles](../maintenance/#compact-data-files).
   ```



##########
docs/flink-actions.md:
##########
@@ -0,0 +1,42 @@
+---
+title: "Flink Actions"
+url: flink-actions
+aliases:
+    - "flink/flink-actions"
+menu:
+    main:
+        parent: Flink
+        weight: 500
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Rewrite files action.
+
+Iceberg provides API to rewrite small files into large files by submitting flink batch job. The behavior of this flink action is the same as the spark's [rewriteDataFiles](../maintenance/#compact-data-files).

Review Comment:
   With https://github.com/apache/iceberg/pull/7070, I capitalized Spark and Flink. Since it is not a verb (yet). But I'm a Dutchie and not a native English speaker.



##########
docs/flink-writes.md:
##########
@@ -0,0 +1,269 @@
+---
+title: "Flink Writes"
+url: flink-writes
+aliases:
+    - "flink/flink-writes"
+menu:
+    main:
+        parent: Flink
+        weight: 400
+---
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one or more
+ - contributor license agreements.  See the NOTICE file distributed with
+ - this work for additional information regarding copyright ownership.
+ - The ASF licenses this file to You under the Apache License, Version 2.0
+ - (the "License"); you may not use this file except in compliance with
+ - the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing, software
+ - distributed under the License is distributed on an "AS IS" BASIS,
+ - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ - See the License for the specific language governing permissions and
+ - limitations under the License.
+ -->
+
+## Writing with SQL
+
+Iceberg support both `INSERT INTO` and `INSERT OVERWRITE`.
+
+### `INSERT INTO`
+
+To append new data to a table with a Flink streaming job, use `INSERT INTO`:
+
+```sql
+INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
+INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table;
+```
+
+### `INSERT OVERWRITE`
+
+To replace data in the table with the result of a query, use `INSERT OVERWRITE` in batch job (flink streaming job does not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables.
+
+Partitions that have rows produced by the SELECT query will be replaced, for example:
+
+```sql
+INSERT OVERWRITE sample VALUES (1, 'a');
+```
+
+Iceberg also support overwriting given partitions by the `select` values:
+
+```sql
+INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
+```
+
+For a partitioned iceberg table, when all the partition columns are set a value in `PARTITION` clause, it is inserting into a static partition, otherwise if partial partition columns (prefix part of all partition columns) are set a value in `PARTITION` clause, it is writing the query result into a dynamic partition.
+For an unpartitioned iceberg table, its data will be completely overwritten by `INSERT OVERWRITE`.
+
+### `UPSERT`
+
+Iceberg supports `UPSERT` based on the primary key when writing data into v2 table format. There are two ways to enable upsert.
+
+1. Enable the `UPSERT` mode as table-level property `write.upsert.enabled`. Here is an example SQL statement to set the table property when creating a table. It would be applied for all write paths to this table (batch or streaming) unless overwritten by write options as described later.
+
+```sql
+CREATE TABLE `hive_catalog`.`default`.`sample` (
+  `id`  INT UNIQUE COMMENT 'unique id',
+  `data` STRING NOT NULL,
+ PRIMARY KEY(`id`) NOT ENFORCED
+) with ('format-version'='2', 'write.upsert.enabled'='true');
+```
+
+2. Enabling `UPSERT` mode using `upsert-enabled` in the [write options](#Write-options) provides more flexibility than a table level config. Note that you still need to use v2 table format and specify the primary key when creating the table.
+
+```sql
+INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
+...
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+
+
+## Writing with DataStream
+
+Iceberg support writing to iceberg table from different DataStream input.
+
+
+### Appending data.
+
+Flink supports writing `DataStream<RowData>` and `DataStream<Row>` to the sink iceberg table natively.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+The iceberg API also allows users to write generic `DataStream<T>` to iceberg table, more example could be found in this [unit test](https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java).
+
+### Overwrite data
+
+Set the `overwrite` flag in FlinkSink builder to overwrite the data in existing iceberg tables:
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .overwrite(true)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+### Upsert data
+
+Set the `upsert` flag in FlinkSink builder to upsert the data in existing iceberg table. The table must use v2 table format and have a primary key.
+
+```java
+StreamExecutionEnvironment env = ...;
+
+DataStream<RowData> input = ... ;
+Configuration hadoopConf = new Configuration();
+TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
+
+FlinkSink.forRowData(input)
+    .tableLoader(tableLoader)
+    .upsert(true)
+    .append();
+
+env.execute("Test Iceberg DataStream");
+```
+
+{{< hint info >}}
+OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
+{{< /hint >}}
+
+### Write with Avro GenericRecord
+
+Flink Iceberg sink provides `AvroGenericRecordToRowDataMapper` that converts
+Avro `GenericRecord` to Flink `RowData`. You can use the mapper to write
+Avro GenericRecord DataStream to Iceberg.
+
+Please make sure `flink-avro` jar is included in the classpath.
+Also `iceberg-flink-runtime` shaded bundle jar can't be used
+because the runtime jar shades the avro package.
+Please use non-shaded `iceberg-flink` jar instead.
+
+```java
+DataStream<org.apache.avro.generic.GenericRecord> dataStream = ...;
+
+Schema icebergSchema = table.schema();
+
+
+// The Avro schema converted from Iceberg schema can't be used
+// due to precision difference between how Iceberg schema (micro)
+// and Flink AvroToRowDataConverters (milli) deal with time type.
+// Instead, use the Avro schema defined directly.
+// See AvroGenericRecordToRowDataMapper Javadoc for more details.
+org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, table.name());
+
+GenericRecordAvroTypeInfo avroTypeInfo = new GenericRecordAvroTypeInfo(avroSchema);
+RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+
+FlinkSink.builderFor(
+    dataStream,
+    AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema),
+    FlinkCompatibilityUtil.toTypeInfo(rowType))
+  .table(table)
+  .tableLoader(tableLoader)
+  .append();
+```
+
+### Netrics
+
+The following Flink metrics are provided by the Flink Iceberg sink.
+
+Parallel writer metrics are added under the sub group of `IcebergStreamWriter`.
+They should have the following key-value tags.
+
+* table: full table name (like iceberg.my_db.my_table)
+* subtask_index: writer subtask index starting from 0
+
+ Metric name                | Metric type | Description                                                                                         |
+| ------------------------- |------------|-----------------------------------------------------------------------------------------------------|
+| lastFlushDurationMs       | Gague      | The duration (in milli) that writer subtasks take to flush and upload the files during checkpoint.  |
+| flushedDataFiles          | Counter    | Number of data files flushed and uploaded.                                                          |
+| flushedDeleteFiles        | Counter    | Number of delete files flushed and uploaded.                                                        |
+| flushedReferencedDataFiles| Counter    | Number of data files referenced by the flushed delete files.                                        |
+| dataFilesSizeHistogram    | Histogram  | Histogram distribution of data file sizes (in bytes).                                               |
+| deleteFilesSizeHistogram  | Histogram  | Histogram distribution of delete file sizes (in bytes).                                             |
+
+Committer metrics are added under the sub group of `IcebergFilesCommitter`.
+They should have the following key-value tags.
+
+* table: full table name (like iceberg.my_db.my_table)
+
+ Metric name                      | Metric type | Description                                                                |
+|---------------------------------|--------|----------------------------------------------------------------------------|
+| lastCheckpointDurationMs        | Gague  | The duration (in milli) that the committer operator checkpoints its state. |
+| lastCommitDurationMs            | Gague  | The duration (in milli) that the Iceberg table commit takes.               |
+| committedDataFilesCount         | Counter | Number of data files committed.                                            |
+| committedDataFilesRecordCount   | Counter | Number of records contained in the committed data files.                   |
+| committedDataFilesByteCount     | Counter | Number of bytes contained in the committed data files.                     |
+| committedDeleteFilesCount       | Counter | Number of delete files committed.                                          |
+| committedDeleteFilesRecordCount | Counter | Number of records contained in the committed delete files.                 |
+| committedDeleteFilesByteCount   | Counter | Number of bytes contained in the committed delete files.                   |
+| elapsedSecondsSinceLastSuccessfulCommit| Gague  | Elapsed time (in seconds) since last successful Iceberg commit.            |
+
+`elapsedSecondsSinceLastSuccessfulCommit` is an ideal alerting metric
+to detect failed or missing Iceberg commits.
+
+* Iceberg commit happened after successful Flink checkpoint in the `notifyCheckpointComplete` callback.
+  It could happen that Iceberg commits failed (for whatever reason), while Flink checkpoints succeeding.
+* It could also happen that `notifyCheckpointComplete` wasn't triggered (for whatever bug).
+  As a result, there won't be any Iceberg commits attempted.
+
+If the checkpoint interval (and expected Iceberg commit interval) is 5 minutes, set up alert with rule like `elapsedSecondsSinceLastSuccessfulCommit > 60 minutes` to detect failed or missing Iceberg commits in the past hour.
+
+
+
+## Options
+
+### Write options
+
+Flink write options are passed when configuring the FlinkSink, like this:
+
+```
+FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+    .table(table)
+    .tableLoader(tableLoader)
+    .set("write-format", "orc")
+    .set(FlinkWriteOptions.OVERWRITE_MODE, "true");
+```
+
+For Flink SQL, write options can be passed in via SQL hints like this:
+
+```

Review Comment:
   ```suggestion
   ```sql
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 911432 commented on pull request #7099: Doc: Retypeset the Flink document

Posted by "911432 (via GitHub)" <gi...@apache.org>.
911432 commented on PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#issuecomment-1491169518

   I have done rebase about https://github.com/apache/iceberg/pull/7246 and https://github.com/apache/iceberg/pull/7247


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on pull request #7099: Doc: Retypeset the Flink document

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#issuecomment-1481848100

   Merging this since there are other PRs affecting the Flink docs as well 👍🏻 Thanks @hililiwei for picking this up, this is a major improvement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7099: Doc: Retypeset the Flink document

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#discussion_r1145655549


##########
docs/flink-getting-started.md:
##########
@@ -219,103 +221,9 @@ The following properties can be set if using the Hive catalog:
 * `clients`: The Hive metastore client pool size, default value is 2. (Optional)
 * `warehouse`: The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath.
 * `hive-conf-dir`: Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwritten with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog.
-* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values. 
+* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values.

Review Comment:
   why is the hive catalog section not completely removed here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 911432 commented on a diff in pull request #7099: Doc: Retypeset the Flink document

Posted by "911432 (via GitHub)" <gi...@apache.org>.
911432 commented on code in PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#discussion_r1148552500


##########
docs/flink-getting-started.md:
##########
@@ -108,7 +110,7 @@ wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_V
 ## Flink's Python API
 
 {{< hint info >}}
-PyFlink 1.6.1 [does not work on OSX with a M1 cpu](https://issues.apache.org/jira/browse/FLINK-28786) 
+PyFlink 1.6.1 [does not work on OSX with a M1 cpu](https://issues.apache.org/jira/browse/FLINK-28786)

Review Comment:
   I think the version is 1.16.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 911432 commented on pull request #7099: Doc: Retypeset the Flink document

Posted by "911432 (via GitHub)" <gi...@apache.org>.
911432 commented on PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#issuecomment-1484114206

   I have done rebase about https://github.com/apache/iceberg/pull/7212 and https://github.com/apache/iceberg/pull/7213


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #7099: Doc: Retypeset the Flink document

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on PR #7099:
URL: https://github.com/apache/iceberg/pull/7099#issuecomment-1491240764

   thank you guys, @Fokko @stevenzwu .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org