You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/16 09:06:59 UTC

[incubator-inlong-website] branch master updated: [INLONG-407][Sort] Add pulsar and iceberg docs (#412)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a95cbda9 [INLONG-407][Sort] Add pulsar and iceberg docs (#412)
8a95cbda9 is described below

commit 8a95cbda946666eda465fc42d98e8ed2934d73ea
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Thu Jun 16 17:06:55 2022 +0800

    [INLONG-407][Sort] Add pulsar and iceberg docs (#412)
    
    Co-authored-by: dockerzhang <do...@apache.org>
---
 docs/data_node/extract_node/pulsar.md              | 125 +++++++++++++-
 docs/data_node/load_node/iceberg.md                | 187 +++++++++++++++++++-
 .../current/data_node/extract_node/pulsar.md       | 123 ++++++++++++-
 .../current/data_node/load_node/iceberg.md         | 191 ++++++++++++++++++++-
 4 files changed, 615 insertions(+), 11 deletions(-)

diff --git a/docs/data_node/extract_node/pulsar.md b/docs/data_node/extract_node/pulsar.md
index e452c4e72..cd6f482ac 100644
--- a/docs/data_node/extract_node/pulsar.md
+++ b/docs/data_node/extract_node/pulsar.md
@@ -1,4 +1,127 @@
 ---
 title: Pulsar
 sidebar_position: 10
----
\ No newline at end of file
+---
+
+## Overview
+
+[Apache Pulsar](https://pulsar.apache.org/) is a distributed, open source pub-sub messaging and steaming platform for real-time workloads, managing hundreds of billions of events per day.
+
+## Version
+
+| Extract Node          | Version                                                      |
+| --------------------- | ------------------------------------------------------------ |
+| [Pulsar](./pulsar.md) | [Pulsar](https://pulsar.apache.org/docs/next/): >= 2.8.x<br/> |
+
+## Dependencies
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-pulsar</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## Usage
+
+### Usage for SQL API
+
+Step.1 Ready for sql client
+
+The [SQL Client](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sqlClient.html) is used to write SQL queries for manipulating data in Pulsar, you can use the `-addclasspath` option to add `sort-connector-pulsar-{{INLONG_VERSION}}.jar` package.
+
+**Example**
+
+```shell
+./bin/sql-client.sh embedded --jar sort-connector-pulsar_{{INLONG_VERSION}}.jar
+```
+
+> Note
+> If you put the JAR package of our connector under `$FLINK_HOME/lib`, do not use `--jar` again to specify the package of the connector.
+
+Step.2 Read data from pulsar
+
+```sql
+CREATE TABLE pulsar (
+  `physical_1` STRING,
+  `physical_2` INT,
+  `eventTime` TIMESTAMP(3) METADATA,
+  `properties` MAP<STRING, STRING> METADATA ,
+  `topic` STRING METADATA VIRTUAL,
+  `sequenceId` BIGINT METADATA VIRTUAL,
+  `key` STRING ,
+  `physical_3` BOOLEAN
+) WITH (
+  'connector' = 'pulsar',
+  'topic' = 'persistent://public/default/topic82547611',
+  'key.format' = 'raw',
+  'key.fields' = 'key',
+  'value.format' = 'avro',
+  'service-url' = 'pulsar://localhost:6650',
+  'admin-url' = 'http://localhost:8080',
+  'scan.startup.mode' = 'earliest' 
+)
+
+INSERT INTO `sink_table` 
+    SELECT 
+    `physical_1` AS `physical_1`,
+    `physical_2` AS `physical_2`
+    FROM `pulsar`
+INSERT INTO pulsar 
+VALUES
+ ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k11', 'v11', 'k12', 'v12'], 'key1', TRUE),
+ ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', MAP['k21', 'v21', 'k22', 'v22'], 'key2', FALSE),
+ ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k31', 'v31', 'k32', 'v32'], 'key3', TRUE)
+```
+
+### Usage for InLong Dashboard
+
+TODO
+
+### Usage for InLong Manager Client
+
+TODO
+
+## Pulsar Extract Node Options
+
+| Parameter                     | Required | Default value | Type   | Description                                                  |
+| ----------------------------- | -------- | ------------- | ------ | ------------------------------------------------------------ |
+| connector                     | required | (none)        | String | Set the connector type. Available options are `pulsar-inlong`. |
+| topic                         | optional | (none)        | String | Set the input or output topic, use half comma for multiple and concatenate topics. Choose one with the topic-pattern. |
+| topic-pattern                 | optional | (none)        | String | Use regular to get the matching topic.                       |
+| service-url                   | required | (none)        | String | Set the Pulsar broker service address.                       |
+| admin-url                     | required | (none)        | String | Set the Pulsar administration service address.               |
+| scan.startup.mode             | optional | latest        | String | Configure the Source's startup mode. Available options are `earliest`, `latest`, `external-subscription`, and `specific-offsets`. |
+| scan.startup.specific-offsets | optional | (none)        | String | This parameter is required when the `specific-offsets` parameter is specified. |
+| scan.startup.sub-name         | optional | (none)        | String | This parameter is required when the `external-subscription` parameter is specified. |
+| discovery topic interval      | optional | (none)        | Long   | Set the time interval for partition discovery, in unit of milliseconds. |
+| sink.message-router           | optional | key-hash      | String | Set the routing method for writing messages to the Pulsar partition. Available options are `key-hash`, `round-robin`, and `custom MessageRouter`. |
+| sink.semantic                 | optional | at-least-once | String | The Sink writes the assurance level of the message. Available options are `at-least-once`, `exactly-once`, and `none`. |
+| properties                    | optional | empty         | Map    | Set Pulsar's optional configurations, in a format of `properties.key='value'`. For details, see [Configuration parameters](https://github.com/streamnative/pulsar-flink#configuration-parameters). |
+| key.format                    | optional | (none)        | String | Set the key-based serialization format for Pulsar messages. Available options are `No format`, `optional raw`, `Avro`, `JSON`, etc. |
+| key.fields                    | optional | (none)        | String | The SQL definition field to be used when serializing Key, multiple by half comma `,` concatenated. |
+| key.fields-prefix             | optional | (none)        | String | Define a custom prefix for all fields in the key format to avoid name conflicts with fields in the value format. By default, the prefix is empty. If a custom prefix is defined, the Table schema and `key.fields` are used. |
+| format or value.format        | required | (none)        | String | Set the name with a prefix. When constructing data types in the key format, the prefix is removed and non-prefixed names are used within the key format. Pulsar message value serialization format, support JSON, Avro, etc. For more information, see the Flink format. |
+| value.fields-include          | optional | ALL           | Enum   | The Pulsar message value contains the field policy, optionally ALL, and EXCEPT_KEY. |
+
+## Available Metadata
+
+The METADATA flag is used to read and write metadata in Pulsar messages. The support list is as follows.
+
+> Note
+> The R/W column defines whether a metadata field is readable (R) and/or writable (W). Read-only columns must be declared VIRTUAL to exclude them during an INSERT INTO operation.
+
+| Key         | Data Type                                  | Description                                   | R/W  |
+| ----------- | ------------------------------------------ | --------------------------------------------- | ---- |
+| topic       | STRING NOT NULL                            | Topic name of the Pulsar message.             | R    |
+| messageId   | BYTES NOT NULL                             | Message ID of the Pulsar message.             | R    |
+| sequenceId  | BIGINT NOT NULL                            | sequence ID of the Pulsar message.            | R    |
+| publishTime | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL | Publishing time of the Pulsar message.        | R    |
+| eventTime   | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL | Generation time of the Pulsar message.        | R/W  |
+| properties  | MAP<STRING, STRING> NOT NULL               | Extensions information of the Pulsar message. | R/W  |
+
+##  Data Type Mapping
+
+Pulsar stores message keys and values as bytes, so Pulsar doesn’t have schema or data types. The Pulsar messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to [Formats](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/) pages for more details.
\ No newline at end of file
diff --git a/docs/data_node/load_node/iceberg.md b/docs/data_node/load_node/iceberg.md
index 18ebbb444..db4c9c075 100644
--- a/docs/data_node/load_node/iceberg.md
+++ b/docs/data_node/load_node/iceberg.md
@@ -2,11 +2,190 @@
 title: Iceberg
 sidebar_position: 4
 ---
-
 ## Overview
 [Apache Iceberg](https://iceberg.apache.org/) is a high-performance format for huge analytic tables.
 
-## Configuration
-When creating a data flow, select `Iceberg` for the data stream direction, and click "Add" to configure it.
+## Version
+
+| Extract Node            | Version                                                    |
+| ----------------------- | ---------------------------------------------------------- |
+| [Iceberg](./iceberg.md) | [Iceberg](https://dev.mysql.com/doc): 0.12.x, 0.13.x <br/> |
+
+## Dependencies
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-iceberg</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## Usage
+
+### Usage for SQL API
+
+To create iceberg table in flink, we recommend to use [Flink SQL Client](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html) because it’s easier for users to understand the concepts.
+
+Step.1 Start a standalone flink cluster within hadoop environment.
+
+```bash
+# HADOOP_HOME is your hadoop root directory after unpack the binary package.
+export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
+
+# Start the flink standalone cluster
+./bin/start-cluster.sh
+```
+
+Step.2 Start the Flink SQL client.
+
+We’ve created a separate `flink-runtime` module in iceberg project to generate a bundled jar, which could be loaded by flink SQL client directly.
+
+If we want to build the `flink-runtime` bundled jar manually, please just build the `inlong` project and it will generate the jar under `<inlong-root-dir>/inlong-sort/sort-connectors/iceberg/target`.
+
+By default, iceberg has included hadoop jars for hadoop catalog. If we want to use hive catalog, we will need to load the hive jars when opening the flink sql client. Fortunately, inlong auto package a bundled hive jar into iceberg. So we could open the sql client as the following:
+
+```bash
+# HADOOP_HOME is your hadoop root directory after unpack the binary package.
+export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
+
+./bin/sql-client.sh embedded -j <flink-runtime-directory>/sort-connector-iceberg-{inlong-version}.jar
+```
+
+Step.3 create a table in current Flink catalog
+
+By default,we do not need to create a catalog ,just use memory catalog. In catalog if `catalog-database.catalog-table` doesn't exist, it will be created automatic.Here we just load data into it.
+
+**Table managed in Hive catalog**
+
+The following SQL will create a Flink table in the current Flink catalog, which maps to the iceberg table `default_database.iceberg_table` managed in iceberg catalog.Because catalog type default is hive,so here do not need to put `catalog-type`.
+
+```sql
+CREATE TABLE flink_table (
+    id   BIGINT,
+    data STRING
+) WITH (
+    'connector'='iceberg',
+    'catalog-name'='hive_prod',
+    'uri'='thrift://localhost:9083',
+    'warehouse'='hdfs://nn:8020/path/to/warehouse'
+);
+```
+
+If you want to create a Flink table mapping to a different iceberg table managed in Hive catalog (such as `hive_db.hive_iceberg_table` in Hive), then you can create Flink table as following:
+
+```sql
+CREATE TABLE flink_table (
+    id   BIGINT,
+    data STRING
+) WITH (
+    'connector'='iceberg',
+    'catalog-name'='hive_prod',
+    'catalog-database'='hive_db',
+    'catalog-table'='hive_iceberg_table',
+    'uri'='thrift://localhost:9083',
+    'warehouse'='hdfs://nn:8020/path/to/warehouse'
+);
+```
+
+> The underlying catalog database (`hive_db` in the above example) will be created automatically if it does not exist when writing records into the Flink table.
+
+**Table managed in hadoop catalog**
+
+The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table `default_database.flink_table` managed in hadoop catalog.
+
+```sql
+CREATE TABLE flink_table (
+    id   BIGINT,
+    data STRING
+) WITH (
+    'connector'='iceberg',
+    'catalog-name'='hadoop_prod',
+    'catalog-type'='hadoop',
+    'warehouse'='hdfs://nn:8020/path/to/warehouse'
+);
+```
+
+**Table managed in custom catalog**
+
+The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table `default_database.flink_table` managed in custom catalog.
+
+```sql
+CREATE TABLE flink_table (
+    id   BIGINT,
+    data STRING
+) WITH (
+    'connector'='iceberg',
+    'catalog-name'='custom_prod',
+    'catalog-type'='custom',
+    'catalog-impl'='com.my.custom.CatalogImpl',
+     -- More table properties for the customized catalog
+    'my-additional-catalog-config'='my-value',
+     ...
+);
+```
+
+Please check sections under the Integrations tab for all custom catalogs.
+
+Step.4 insert data into iceberg table 
+
+```sql
+INSERT INTO `flink_table` 
+    SELECT 
+    `id` AS `id`,
+    `d` AS `name`
+    FROM `source_table`
+```
+
+### Usage for InLong Dashboard
+TODO
+
+### Usage for InLong Manager Client
+TODO
+
+## Iceberg Load Node Options
+
+| Option           | Required                                    | Default | Type    | Description                                                  |
+| ---------------- | ------------------------------------------- | ------- | ------- | ------------------------------------------------------------ |
+| connector        | required                                    | (none)  | String  | Specify what connector to use, here should be `'iceberg'`.   |
+| catalog-type     | required                                    | hive    | String  | `hive` or `hadoop` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. |
+| catalog-name     | required                                    | (none)  | String  | Catalog name.                                                |
+| catalog-database | required                                    | (none)  | String  | Database name managed in the iceberg catalog.                |
+| catalog-table    | required                                    | (none)  | String  | Table name managed in the underlying iceberg catalog and database. |
+| catalog-impl     | optional for custom catalog                 | (none)  | String  | The fully-qualified class name custom catalog implementation, must be set if `catalog-type` is unset. |
+| cache-enabled    | optional                                    | true    | Boolean | Whether to enable catalog cache, default value is `true`     |
+| uri              | required for hive catalog                   | (none)  | String  | The Hive metastore’s thrift URI.                             |
+| clients          | optional for hive catalog                   | 2       | Integer | The Hive metastore client pool size, default value is 2.     |
+| warehouse        | optional for hadoop catalog or hive catalog | (none)  | String  | For Hive catalog,is the Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath. For hadoop catalog,The HDFS directory to store metadata files and data files. |
+| hive-conf-dir    | optional for hive catalog                   | (none)  | String  | Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwrote with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog. |
+
+## Data Type Mapping
+
+[Iceberg data type](https://iceberg.apache.org/spec/#schemas-and-data-types) detail. Here is iceberg type convert to flink type when load data.
 
-![Iceberg Configuration](img/iceberg.png)
\ No newline at end of file
+| Flink SQL Type | Iceberg Type |
+| -------------- | ------------ |
+| CHAR           | STRING       |
+| VARCHAR        | STRING       |
+| STRING         | STRING       |
+| BOOLEAN        | BOOLEAN      |
+| BINARY         | FIXED(L)     |
+| VARBINARY      | BINARY       |
+| DECIMAL        | DECIMAL(P,S) |
+| TINYINT        | INT          |
+| SMALLINT       | INT          |
+| INTEGER        | INT          |
+| BIGINT         | LONG         |
+| FLOAT          | FLOAT        |
+| DOUBLE         | DOUBLE       |
+| DATE           | DATE         |
+| TIME           | TIME         |
+| TIMESTAMP      | TIMESTAMP    |
+| TIMESTAMP_LTZ  | TIMESTAMPTZ  |
+| INTERVAL       | -            |
+| ARRAY          | LIST         |
+| MULTISET       | MAP          |
+| MAP            | MAP          |
+| ROW            | STRUCT       |
+| RAW            | -            |
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/pulsar.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/pulsar.md
index e452c4e72..1dcd7d2db 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/pulsar.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/pulsar.md
@@ -1,4 +1,125 @@
 ---
 title: Pulsar
 sidebar_position: 10
----
\ No newline at end of file
+---
+
+## 概述
+
+[Apache Pulsar](https://pulsar.apache.org/)是一个分布式、开源的 pub-sub 消息传递和流平台,用于实时工作负载,每天管理数千亿个事件。
+
+## 版本
+
+| 抽取节点              | 版本                                                      |
+| --------------------- | --------------------------------------------------------- |
+| [Pulsar](./pulsar.md) | [Pulsar](https://pulsar.apache.org/docs/next/):> = 2.8.x |
+
+## 依赖项
+
+```
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-pulsar</artifactId>
+    <!-- 选择匹配你inlong应用的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## 用法
+
+### SQL API 用法
+
+Step.1 准备好 sql 客户端
+
+[SQL Client](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sqlClient.html)用于编写用于在 Pulsar 中操作数据的 SQL 查询,您可以使用添加`-addclasspath`包的选项`sort-connector-pulsar-{{INLONG_VERSION}}.jar`。
+
+**例子**
+
+```
+./bin/sql-client.sh embedded --jar sort-connector-pulsar_{{INLONG_VERSION}}.jar
+```
+
+> 注意如果你把我们连接器的JAR包放在下面`$FLINK_HOME/lib`,不用`--jar`再用指定连接器的包了。
+
+Step.2 从Pulsar读取数据
+
+```
+CREATE TABLE pulsar (
+  `physical_1` STRING,
+  `physical_2` INT,
+  `eventTime` TIMESTAMP(3) METADATA,
+  `properties` MAP<STRING, STRING> METADATA ,
+  `topic` STRING METADATA VIRTUAL,
+  `sequenceId` BIGINT METADATA VIRTUAL,
+  `key` STRING ,
+  `physical_3` BOOLEAN
+) WITH (
+  'connector' = 'pulsar',
+  'topic' = 'persistent://public/default/topic82547611',
+  'key.format' = 'raw',
+  'key.fields' = 'key',
+  'value.format' = 'avro',
+  'service-url' = 'pulsar://localhost:6650',
+  'admin-url' = 'http://localhost:8080',
+  'scan.startup.mode' = 'earliest' 
+)
+
+INSERT INTO `sink_table` 
+    SELECT 
+    `physical_1` AS `physical_1`,
+    `physical_2` AS `physical_2`
+    FROM `pulsar`
+INSERT INTO pulsar 
+VALUES
+ ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k11', 'v11', 'k12', 'v12'], 'key1', TRUE),
+ ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', MAP['k21', 'v21', 'k22', 'v22'], 'key2', FALSE),
+ ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k31', 'v31', 'k32', 'v32'], 'key3', TRUE)
+```
+
+### Inlong Dashboard 用法
+
+TODO
+
+### InLong Manager Client 方式
+
+TODO
+
+## Pulsar Extract 节点参数
+
+| Parameter                     | Required | Default value | Type   | Description                                                  |
+| ----------------------------- | -------- | ------------- | ------ | ------------------------------------------------------------ |
+| connector                     | 必需     | (none)        | String | 设置连接器类型。可用的选项是`pulsar-inlong`。                |
+| topic                         | 可选     | (none)        | String | 设置输入或输出主题,多个和连接主题使用半逗号。选择一个主题模式。Set the input or output topic, use half comma for multiple and concatenate topics. Choose one with the topic-pattern. |
+| topic-pattern                 | 可选     | (none)        | String | 使用正则获取匹配的主题。                                     |
+| service-url                   | 必需     | (none)        | String | 设置 Pulsar 代理服务地址。                                   |
+| admin-url                     | 必需     | (none)        | String | 设置 Pulsar 管理服务地址。                                   |
+| scan.startup.mode             | 可选     | latest        | String | 配置 Source 的启动模式。可用选项为`earliest`、`latest`、`external-subscription`和`specific-offsets`。 |
+| scan.startup.specific-offsets | 可选     | (none)        | String | 指定参数时需要该`specific-offsets`参数。                     |
+| scan.startup.sub-name         | 可选     | (none)        | String | 指定参数时需要该`external-subscription`参数。                |
+| discovery topic interval      | 可选     | (none)        | Long   | 设置分区发现的时间间隔,单位为毫秒。                         |
+| sink.message-router           | 可选     | key-hash      | String | 设置将消息写入 Pulsar 分区的路由方式。可用选项为`key-hash`、`round-robin`和`custom MessageRouter`。 |
+| sink.semantic                 | 可选     | at-least-once | String | Sink 写入消息的保证级别。可用选项为`at-least-once`、`exactly-once`和`none`。 |
+| properties                    | 可选     | empty         | Map    | 设置 Pulsar 的可选配置,格式为`properties.key='value'`. 有关详细信息,请参阅[配置参数](https://github.com/streamnative/pulsar-flink#configuration-parameters)。 |
+| key.format                    | 可选     | (none)        | String | 为 Pulsar 消息设置基于键的序列化格式。可用选项有`No format`、`optional raw`、`Avro`、`JSON`等。 |
+| key.fields                    | 可选     | (none)        | String | 序列化Key时要使用的SQL定义字段,多个半逗号`,`连接。          |
+| key.fields-prefix             | 可选     | (none)        | String | 为 key 格式的所有字段定义自定义前缀,以避免与 value 格式的字段名称冲突。默认情况下,前缀为空。如果定义了自定义前缀,`key.fields`则使用表架构和。 |
+| format or value.format        | 必需     | (none)        | String | 使用前缀设置名称。当以键格式构造数据类型时,前缀被移除,并且在键格式中使用非前缀名称。Pulsar 消息值序列化格式,支持 JSON、Avro 等。更多信息请参见 Flink 格式。 |
+| value.fields-include          | 可选     | ALL           | Enum   | Pulsar 消息值包含字段策略、可选的 ALL 和 EXCEPT_KEY。        |
+
+## 可用元数据
+
+METADATA 标志用于读取和写入 Pulsar 消息中的元数据。支持列表如下。
+
+> 注意 R/W 列定义元数据字段是否可读 (R) 和/或可写 (W)。只读列必须声明为 VIRTUAL 以在 INSERT INTO 操作期间排除它们。
+
+| 关键字      | 数据类型                                   | 描述                    | 读/写 |
+| ----------- | ------------------------------------------ | ----------------------- | ----- |
+| topic       | STRING NOT NULL                            | Pulsar 消息的主题名称 | R     |
+| messageId   | BYTES NOT NULL                             | Pulsar 消息的消息 ID  | R     |
+| sequenceId  | BIGINT NOT NULL                            | Pulsar 消息的序列 ID  | R     |
+| publishTime | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL | Pulsar 消息的发布时间 | R     |
+| eventTime   | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL | Pulsar 消息的生成时间 | R/W   |
+| properties  | MAP<STRING, STRING> NOT NULL               | Pulsar 消息的扩展信息 | R/W   |
+
+## 数据类型映射
+
+Pulsar 将消息键和值存储为字节,因此 Pulsar 没有 schema 或数据类型。Pulsar 消息按格式进行反序列化和序列化,例如 csv、json、avro。因此,数据类型映射由特定格式确定。有关格式详细信息,请参阅[格式页面。](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/)
\ No newline at end of file
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/iceberg.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/iceberg.md
index b0825b1fc..e4f63b061 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/iceberg.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/iceberg.md
@@ -3,10 +3,191 @@ title: Iceberg
 sidebar_position: 3
 ---
 
-## 总览
-[Apache Iceberg](https://iceberg.apache.org/) 是一种用于跟踪超大规模表的新格式。
+## 概述
 
-## 配置
-创建数据流时,数据流向选择 `Iceberg`,并点击 ”添加“ 进行配置。
+[Apache Iceberg](https://iceberg.apache.org/)是一种用于大型分析表的高性能格式。
 
-![Iceberg Configuration](img/iceberg.png)
\ No newline at end of file
+## 版本
+
+| 提取节点                | 版本                                                 |
+| ----------------------- | ---------------------------------------------------- |
+| [Iceberg](./iceberg.md) | [Iceberg](https://dev.mysql.com/doc):0.12.x,0.13.x |
+
+## 依赖项
+
+```
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-iceberg</artifactId>
+    <!-- 选择匹配你inlong应用的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## 用法
+
+### SQL API 用法
+
+在 flink 中创建Iceberg表,我们推荐使用[Flink SQL Client](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html),因为它更便于用户理解概念。
+
+Step.1 在hadoop环境下启动一个独立的flink集群。
+
+```
+# HADOOP_HOME is your hadoop root directory after unpack the binary package.
+export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
+
+# Start the flink standalone cluster
+./bin/start-cluster.sh
+```
+
+Step.2 启动flink SQL客户端。
+
+`flink-runtime`在 iceberg 项目中创建了一个单独的模块来生成一个捆绑的 jar,可以直接由 flink SQL 客户端加载。
+
+如果想要`flink-runtime`手动构建捆绑的 jar,只需构建`inlong`项目,它将在`<inlong-root-dir>/inlong-sort/sort-connectors/iceberg/target`。
+
+默认情况下,iceberg 包含用于 hadoop 目录的 hadoop jars。如果我们要使用 hive 目录,我们需要在打开 flink sql 客户端时加载 hive jars。幸运的是,apache inlong将 一个捆绑的hive jar打包进入Iceberg。所以我们可以如下打开sql客户端:
+
+```
+# HADOOP_HOME is your hadoop root directory after unpack the binary package.
+export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
+
+./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-xxx.jar shell
+```
+
+Step.3 在当前 Flink 目录中创建表
+
+默认情况下,我们不需要创建目录,只需使用内存目录即可。在目录中如果`catalog-database.catalog-table`不存在,会自动创建。这里我们只是加载数据。
+
+**在 Hive 目录中管理的表**
+
+下面的 SQL 会在当前 Flink 目录中创建一个 Flink 表,映射到 iceberg 目录中`default_database.iceberg_table`管理的 iceberg 表。由于目录类型默认是 hive,所以这里不需要放`catalog-type`.
+
+```
+CREATE TABLE flink_table (
+    id   BIGINT,
+    data STRING
+) WITH (
+    'connector'='iceberg',
+    'catalog-name'='hive_prod',
+    'uri'='thrift://localhost:9083',
+    'warehouse'='hdfs://nn:8020/path/to/warehouse'
+);
+```
+
+如果要创建 Flink 表映射到 Hive 目录中管理的不同Iceberg表(例如`hive_db.hive_iceberg_table`在 Hive 中),则可以创建 Flink 表,如下所示:
+
+```
+CREATE TABLE flink_table (
+    id   BIGINT,
+    data STRING
+) WITH (
+    'connector'='iceberg',
+    'catalog-name'='hive_prod',
+    'catalog-database'='hive_db',
+    'catalog-table'='hive_iceberg_table',
+    'uri'='thrift://localhost:9083',
+    'warehouse'='hdfs://nn:8020/path/to/warehouse'
+);
+```
+
+> 将记录写入 Flink 表时,如果底层目录数据库(`hive_db`上例中)不存在,则会自动创建它。
+
+**在 hadoop 目录中管理的表**
+
+以下 SQL 将在当前 Flink 目录中创建一个 Flink 表,该表映射到`default_database.flink_table`hadoop 目录中管理Iceberg表。
+
+```
+CREATE TABLE flink_table (
+    id   BIGINT,
+    data STRING
+) WITH (
+    'connector'='iceberg',
+    'catalog-name'='hadoop_prod',
+    'catalog-type'='hadoop',
+    'warehouse'='hdfs://nn:8020/path/to/warehouse'
+);
+```
+
+Step.6 向Iceberg表中插入数据
+
+```
+INSERT INTO `flink_table` 
+    SELECT 
+    `id` AS `id`,
+    `d` AS `name`
+    FROM `source_table`
+```
+
+**在自定义Catalog中管理的表**
+
+以下 SQL 将在当前 Flink 目录中创建一个 Flink 表,该表映射到`default_database.flink_table`自定义目录中管理的Iceberg表。
+
+```
+CREATE TABLE flink_table (
+    id   BIGINT,
+    data STRING
+) WITH (
+    'connector'='iceberg',
+    'catalog-name'='custom_prod',
+    'catalog-type'='custom',
+    'catalog-impl'='com.my.custom.CatalogImpl',
+     -- More table properties for the customized catalog
+    'my-additional-catalog-config'='my-value',
+     ...
+);
+```
+
+请检查“集成”选项卡下的部分以获取所有自定义目录。
+
+### InLong Dashboard 用法
+TODO
+
+### InLong Manager Client 用法
+TODO
+
+## Iceberg Load 节点参数
+
+| 选项             | 是否必须                         | 默认值 | 类型    | 描述                                                         |
+| ---------------- | -------------------------------- | ------ | ------- | ------------------------------------------------------------ |
+| connector        | 必需                             | (none) | String  | 指定要使用的连接器,这里应该是`'iceberg'`                    |
+| catalog-type     | 必需                             | hive   | String  | `hive`或`hadoop`用于内置目录,或为使用 catalog-impl 的自定义目录实现未设置 |
+| catalog-name     | 必需                             | (none) | String  | 目录名称                                                     |
+| catalog-database | 必需                             | (none) | String  | 在Iceberg目录中管理的数据库名称                              |
+| catalog-table    | 必需                             | (none) | String  | 在底层Iceberg目录和数据库中管理的表名                        |
+| catalog-impl     | 自定义custom 可选                | (none) | String  | 如果未设置,则必须设置完全限定的类名自定义目录实现`catalog-type` |
+| cache-enabled    | 可选                             | true   | Boolean | 是否启用目录缓存,默认值为`true`                             |
+| uri              | hive catalog可选                 | (none) | String  | Hive 元存储的 thrift URI                                     |
+| clients          | hive catalog可选                 | 2      | Integer | Hive Metastore 客户端池大小,默认值为 2                      |
+| warehouse        | hive catalog或hadoop catalog可选 | (none) | String  | 对于 Hive 目录,是 Hive 仓库位置,如果既不设置`hive-conf-dir`指定包含`hive-site.xml`配置文件的位置也不添加正确`hive-site.xml`的类路径,用户应指定此路径。对于hadoop目录,HDFS目录存放元数据文件和数据文件 |
+| hive-conf-dir    | hive catalog可选                 | (none) | String  | `hive-site.xml`包含将用于提供自定义 Hive 配置值的配置文件的目录的路径。如果同时设置和创建Iceberg目录时,`hive.metastore.warehouse.dir`from `<hive-conf-dir>/hive-site.xml`(或来自类路径的 hive 配置文件)的值将被该值覆盖。`warehouse``hive-conf-dir``warehouse` |
+
+## 数据类型映射
+
+[Iceberg数据类型](https://iceberg.apache.org/spec/#schemas-and-data-types)详细信息。这里介绍了加载数据如何将 Iceberg 类型转换为 Flink 类型。
+
+| Flink SQL 类型 | Iceberg 类型 |
+| -------------- | ------------ |
+| CHAR           | STRING       |
+| VARCHAR        | STRING       |
+| STRING         | STRING       |
+| BOOLEAN        | BOOLEAN      |
+| BINARY         | FIXED(L)     |
+| VARBINARY      | BINARY       |
+| DECIMAL        | DECIMAL(P,S) |
+| TINYINT        | INT          |
+| SMALLINT       | INT          |
+| INTEGER        | INT          |
+| BIGINT         | LONG         |
+| FLOAT          | FLOAT        |
+| DOUBLE         | DOUBLE       |
+| DATE           | DATE         |
+| TIME           | TIME         |
+| TIMESTAMP      | TIMESTAMP    |
+| TIMESTAMP_LTZ  | TIMESTAMPTZ  |
+| INTERVAL       | -            |
+| ARRAY          | LIST         |
+| MULTISET       | MAP          |
+| MAP            | MAP          |
+| ROW            | STRUCT       |
+| RAW            | -            |