You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/14 12:40:17 UTC

[incubator-inlong-website] branch master updated: [INLONG-391][Sort] Add some data node detail doc (#392)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
     new 35caf6d63 [INLONG-391][Sort] Add some data node detail doc (#392)
35caf6d63 is described below

commit 35caf6d631b3bb593cff4efd382c8eec1cf073e4
Author: pacino <ge...@gmail.com>
AuthorDate: Tue Jun 14 20:40:11 2022 +0800

    [INLONG-391][Sort] Add some data node detail doc (#392)
---
 docs/data_node/extract_node/kafka.md               | 158 ++++++++++++++++++-
 docs/data_node/extract_node/mysql-cdc.md           |   4 +-
 docs/data_node/extract_node/overview.md            |  16 +-
 docs/data_node/extract_node/postgresql-cdc.md      | 175 ++++++++++++++++++++-
 docs/data_node/load_node/clickhouse.md             | 122 +++++++++++++-
 docs/data_node/load_node/greenplum.md              | 114 +++++++++++++-
 docs/data_node/load_node/hbase.md                  | 123 ++++++++++++++-
 docs/data_node/load_node/kafka.md                  | 100 +++++++++++-
 docs/data_node/load_node/mysql.md                  | 115 +++++++++++++-
 docs/data_node/load_node/oracle.md                 | 114 +++++++++++++-
 docs/data_node/load_node/overview.md               |  26 +--
 docs/data_node/load_node/postgresql.md             | 112 ++++++++++++-
 docs/data_node/load_node/sqlserver.md              | 115 +++++++++++++-
 docs/data_node/load_node/tdsql-postgresql.md       | 112 ++++++++++++-
 .../current/data_node/extract_node/kafka.md        | 155 +++++++++++++++++-
 .../current/data_node/extract_node/overview.md     |  16 +-
 .../data_node/extract_node/postgresql-cdc.md       | 173 +++++++++++++++++++-
 .../current/data_node/load_node/clickhouse.md      | 122 +++++++++++++-
 .../current/data_node/load_node/greenplum.md       | 112 ++++++++++++-
 .../current/data_node/load_node/hbase.md           | 121 +++++++++++++-
 .../current/data_node/load_node/kafka.md           |  98 +++++++++++-
 .../current/data_node/load_node/mysql.md           | 113 ++++++++++++-
 .../current/data_node/load_node/oracle.md          | 111 ++++++++++++-
 .../current/data_node/load_node/overview.md        |  26 +--
 .../current/data_node/load_node/postgresql.md      | 111 ++++++++++++-
 .../current/data_node/load_node/sqlserver.md       | 113 ++++++++++++-
 .../data_node/load_node/tdsql-postgresql.md        | 110 ++++++++++++-
 27 files changed, 2704 insertions(+), 83 deletions(-)

diff --git a/docs/data_node/extract_node/kafka.md b/docs/data_node/extract_node/kafka.md
index 381ac68f9..389dce957 100644
--- a/docs/data_node/extract_node/kafka.md
+++ b/docs/data_node/extract_node/kafka.md
@@ -3,6 +3,158 @@ title: Kafka
 sidebar_position: 4
 ---
 
-## Configuration
-the Dashboard has not supported extracting data from Kafka for this version, 
-you can create Kafka data streams from the background via the [Command-line Tools](user_guide/command_line_tools.md).
\ No newline at end of file
+## Kafka Extract Node
+
+The `Kafka Extract Node` supports to read data from Kafka topics. It can support read data in the normal fashion and read data in the
+upsert fashion. The `upsert-kafka` connector produces a `changelog stream`, where each data record represents an `update` or 
+`delete` event. The `kafka-inlong` connector can read data and metadata.  
+
+## Supported Version
+
+| Extract Node                | Kafka version |                                                                                                                                                                                                                                                                                                                                                                                           
+|-----------------------------|---------------|
+| [Kafka](./kafka.md)         | 0.10+         |  
+
+## Dependencies  
+
+In order to set up the `Kafka Extract Node`, the following provides dependency information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-kafka</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## How to create a Kafka Extract Node
+
+### Usage for SQL API
+
+The example below shows how to create a Kafka Extract Node with `Flink SQL` :
+* connector is `kafka-inlong`
+```sql
+-- Set checkpoint every 3000 milliseconds                       
+Flink SQL> SET 'execution.checkpointing.interval' = '3s';   
+
+-- Create a Kafka table 'kafka_extract_node' in Flink SQL
+Flink SQL> CREATE TABLE kafka_extract_node (
+           `id` INT,
+           `name` STRINTG
+           ) WITH (
+           'connector' = 'kafka-inlong',
+           'topic' = 'user',
+           'properties.bootstrap.servers' = 'localhost:9092',
+           'properties.group.id' = 'testGroup',
+           'scan.startup.mode' = 'earliest-offset',
+           'format' = 'csv'
+           )
+  
+-- Read data
+Flink SQL> SELECT * FROM kafka_extract_node;
+```
+* connector is `upsert-kafka`
+```sql
+-- Set checkpoint every 3000 milliseconds                       
+Flink SQL> SET 'execution.checkpointing.interval' = '3s';
+
+-- Create a Kafka table 'kafka_extract_node' in Flink SQL
+Flink SQL> CREATE TABLE kafka_extract_node (
+          `id` INT,
+          `name` STRINTG,
+           PRIMARY KEY (`id`) NOT ENFORCED
+          ) WITH (
+          'connector' = 'upsert-kafka',
+          'topic' = 'user',
+          'properties.bootstrap.servers' = 'localhost:9092',
+          'properties.group.id' = 'testGroup',
+          'scan.startup.mode' = 'earliest-offset',
+          'key.format' = 'csv',
+          'value.format' = 'csv'
+          )
+    
+-- Read data
+Flink SQL> SELECT * FROM kafka_extract_node;       
+```
+### Usage for InLong Dashboard
+
+TODO: It will be supported in the future.
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## Kafka Extract Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify which connector to use, valid values are:  1. for the Upsert Kafka use: `upsert-kafka`  2. for normal Kafka use: `kafka-inlong` |
+| topic | optional | (none) | String | Topic name(s) to read data from when the table is used as source. It also supports  topic list for source by separating topic by semicolon like `topic-1;topic-2`. Note, only one of `topic-pattern` and `topic` can be specified for sources. |
+| topic-pattern | optional | (none) | String | The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of `topic-pattern` and `topic` can be specified for sources. |
+| properties.bootstrap.servers | required | (none) | String | Comma separated list of Kafka brokers. |
+| properties.group.id | required | (none) | String | The id of the consumer group for Kafka source. |
+| properties.* | optional | (none) | String | This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in [Kafka Configuration documentation](https://kafka.apache.org/documentation/#configuration). Flink will remove the `properties.` key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via `properties.allow.auto.create.topics` = `false`. But there are some [...]
+| format | required for normal kafka | (none) | String | The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more [format](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/) options. Note: Either this option or the `value.format` option are required. |
+| key.format | optional | (none) | String | The format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the 'key.fields' option is required as well. Otherwise the Kafka records will have an empty key. |
+| key.fields | optional | [] | `List<String>` | Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like 'field1;field2'. |
+| key.fields-prefix | optional | (none) | String | Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and 'key.fields' will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that 'value.fiel [...]
+| value.format | required for upsert Kafka | (none) | String | The [format](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/) used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. |
+| value.fields-include | optional | ALL | Enum Possible values: [ALL, EXCEPT_KEY]| Defines a strategy how to deal with key columns in the data type of the value format. By default, 'ALL' physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format |
+| scan.startup.mode | optional | group-offsets | String | Startup mode for Kafka consumer, valid values are 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'. See the following Start Reading Position for more details. |
+| scan.startup.specific-offsets | optional | (none) | String | Specify offsets for each partition in case of 'specific-offsets' startup mode, e.g. 'partition:0,offset:42;partition:1,offset:300'. |
+| scan.startup.timestamp-millis | optional | (none) | Long | Start from the specified epoch timestamp (milliseconds) used in case of 'timestamp' startup mode. |
+| scan.topic-partition-discovery.interval | optional | (none) | Duration | Interval for consumer to discover dynamically created Kafka topics and partitions periodically. |
+
+## Available Metadata
+
+The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition. It supports read metadata for format `canal-json-inlong`.
+
+| key | Data Type | Description  | 
+|-----|------------|-------------|
+| value.table_name | STRING | Name of the table that contain the row  | 
+| value.database_name | STRING |  Name of the database that contain the row  |
+| value.op_ts| TIMESTAMP(3) | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0 |
+| value.op_type| STRING | Operation type, INSERT/UPDATE/DELETE |
+| value.batch_id| BIGINT | Not important, a simple increment counter |
+| value.is_ddl| BOOLEAN | Source does not emit ddl data, value is false |
+| value.update_before| `ARRAY<MAP<STRING, STRING>>` | The update-before data for UPDATE record |
+| value.mysql_type | MAP<STRING, STRING> | MySQL field type |
+| value.pk_names | `ARRAY<STRING>` | Primary key |
+| value.sql_type | MAP<STRING, INT> | SQL field type |
+| value.ts | TIMESTAMP_LTZ(3) | The ts_ms field is used to store the information about the local time at which the connector processed/generated the event |
+
+The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
+
+```sql
+CREATE TABLE `kafka_extract_node` (
+      `id` INT,
+      `name` STRING,
+      `database_name` string METADATA FROM 'value.database_name',
+      `table_name`    string METADATA FROM 'value.table_name',
+      `op_ts`         timestamp(3) METADATA FROM 'value.op_ts',
+      `op_type` string METADATA FROM 'value.op_type',
+      `batch_id` bigint METADATA FROM 'value.batch_id',
+      `is_ddl` boolean METADATA FROM 'value.is_ddl',
+      `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update_before',
+      `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql_type',
+      `pk_names` ARRAY<STRING> METADATA FROM 'value.pk_names',
+      `data` STRING METADATA FROM 'value.data',
+      `sql_type` MAP<STRING, INT> METADATA FROM 'value.sql_type',
+      `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ts',
+) WITH (
+      'connector' = 'kafka-inlong',
+      'topic' = 'user',
+      'properties.bootstrap.servers' = 'localhost:9092',
+      'properties.group.id' = 'testGroup',
+      'scan.startup.mode' = 'earliest-offset',
+      'format' = 'canal-json-inlong'
+)
+```
+
+## Data Type Mapping
+
+Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to [Formats](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/) pages for more details.
diff --git a/docs/data_node/extract_node/mysql-cdc.md b/docs/data_node/extract_node/mysql-cdc.md
index edb1a5e63..52822497c 100644
--- a/docs/data_node/extract_node/mysql-cdc.md
+++ b/docs/data_node/extract_node/mysql-cdc.md
@@ -5,7 +5,7 @@ sidebar_position: 5
 
 ## MySQL Extract Node
 
-The MySQL Extract Node allows for reading snapshot data and incremental data from MySQL database. This document describes how to setup the MySQL Extract Node to run SQL queries against MySQL databases.
+The MySQL Extract Node allows for reading snapshot data and incremental data from MySQL database. This document describes how to set up the MySQL Extract Node to run SQL queries against MySQL databases.
 
 ## Supported Version
 | Extract Node                | Version                                                                                                                                                                                                                                                                                                                                                                                                | Driver                  |
@@ -14,7 +14,7 @@ The MySQL Extract Node allows for reading snapshot data and incremental data fro
 
 ## Dependencies
 
-In order to setup the MySQL Extract Node, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+In order to set up the MySQL Extract Node, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
 
 ### Maven dependency
 
diff --git a/docs/data_node/extract_node/overview.md b/docs/data_node/extract_node/overview.md
index 7c36dcbc9..56129a3b2 100644
--- a/docs/data_node/extract_node/overview.md
+++ b/docs/data_node/extract_node/overview.md
@@ -10,14 +10,14 @@ Extract Nodes is a set of Source Connectors based on <a href="https://flink.apac
 ## Supported Extract Nodes
 | Extract Node                        | Version                                                                                                                                                                                                                                                                                                                                                                                               | Driver                  |
 |-------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------|
-| [kafka](kafka.md)                   | [Kafka](https://kafka.apache.org/): 0.10+                                                                                                                                                                                                                                                                                                                                                             | None                    |
-| [pulsar](pulsar.md)                 | [Pulsar](https://pulsar.apache.org/): 2.8.x+                                                                                                                                                                                                                                                                                                                                                          | None                    |
-| [hdfs](hdfs.md)                     | [HDFS](https://hadoop.apache.org/): 2.x, 3.x                                                                                                                                                                                                                                                                                                                                                          | None                    |
-| [mongodb-cdc](mongodb-cdc.md)       | [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0                                                                                                                                                                                                                                                                                                                                                     | None                    |
-| [mysql-cdc](mysql-cdc.md)           | [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <br/>[RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <br/> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <br/> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <br/> [MariaDB](https://mariadb.org): 10.x <br/> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.21     |
-| [oracle-cdc](oracle-cdc.md)         | [Oracle](https://www.oracle.com/index.html): 11, 12, 19                                                                                                                                                                                                                                                                                                                                               | Oracle Driver: 19.3.0.0 |
-| [postgresql-cdc](postgresql-cdc.md) | [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12                                                                                                                                                                                                                                                                                                                                             | None                    |
-| [sqlserver-cdc](sqlserver-cdc.md)   | [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019                                                                                                                                                                                                                                                                                                                       | None                    |
+| [Kafka](kafka.md)                   | [Kafka](https://kafka.apache.org/): 0.10+                                                                                                                                                                                                                                                                                                                                                             | None                    |
+| [Pulsar](pulsar.md)                 | [Pulsar](https://pulsar.apache.org/): 2.8.x+                                                                                                                                                                                                                                                                                                                                                          | None                    |
+| [HDFS](hdfs.md)                     | [HDFS](https://hadoop.apache.org/): 2.x, 3.x                                                                                                                                                                                                                                                                                                                                                          | None                    |
+| [MongoDB-CDC](mongodb-cdc.md)       | [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0                                                                                                                                                                                                                                                                                                                                                     | None                    |
+| [MySQL-CDC](mysql-cdc.md)           | [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <br/>[RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <br/> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <br/> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <br/> [MariaDB](https://mariadb.org): 10.x <br/> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.21     |
+| [Oracle-CDC](oracle-cdc.md)         | [Oracle](https://www.oracle.com/index.html): 11, 12, 19                                                                                                                                                                                                                                                                                                                                               | Oracle Driver: 19.3.0.0 |
+| [PostgreSQL-CDC](postgresql-cdc.md) | [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12                                                                                                                                                                                                                                                                                                                                             | JDBC Driver: 42.2.12     |
+| [SqlServer-CDC](sqlserver-cdc.md)   | [SQLServer](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019                                                                                                                                                                                                                                                                                                                       | None                    |
 
 ## Supported Flink Versions
 The following table shows the version mapping between InLong<sup>®</sup> Extract Nodes and Flink<sup>®</sup>:
diff --git a/docs/data_node/extract_node/postgresql-cdc.md b/docs/data_node/extract_node/postgresql-cdc.md
index 2466f033e..e231986b4 100644
--- a/docs/data_node/extract_node/postgresql-cdc.md
+++ b/docs/data_node/extract_node/postgresql-cdc.md
@@ -1,4 +1,175 @@
 ---
-title: PostgreSQL
+title: PostgreSQL-CDC
 sidebar_position: 9
----
\ No newline at end of file
+---
+
+## PostgreSQL Extract Node
+
+The `PostgreSQL Extract Node` allows for reading snapshot data and incremental data from PostgreSQL database. This document describes how to set up the `PostgreSQL Extract Node` to run SQL queries against PostgreSQL databases.
+
+## Supported Version
+
+| Extract Node                | Version | Driver                  |
+|-----------------------------|---------|-------------------------|
+| [PostgreSQL-CDC](./postgresql-cdc.md) | [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12 | JDBC Driver: 42.2.12 |
+
+## Dependencies
+
+In order to set up the `PostgreSQL Extract Node`, the following provides dependency information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-postgres-cdc</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+## Setup PostgreSQL server
+
+Change Data Capture (CDC) allows you to track and propagate changes in a PostgreSQL database to downstream consumers based on its Write-Ahead Log (WAL).
+You need to ensure that the upstream database is configured to support logical replication. Before using the PostgreSQL connector to monitor the changes committed on a PostgreSQL server, 
+decide which logical decoding plug-in you intend to use. If you plan not to use the native `pgoutput` logical replication stream support, then you must install the logical decoding 
+plug-in into the PostgreSQL server.
+
+### pgoutput
+
+pgoutput is the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community, and used by PostgreSQL itself for logical replication. 
+This plug-in is always present so no additional libraries need to be installed.
+
+1. Check the `wal_level` configuration setting:
+
+```sql
+SHOW wal_level;
+```
+The default value is replica. For CDC, you’ll need to set it to `logical` in the database configuration file (postgresql.conf). Keep in mind that changing the wal_level requires a restart of the Postgres instance and can affect database performance.
+
+2. To configure the replication slot regardless of the decoder being used, specify the following in the postgresql.conf file:
+
+```properties
+wal_level = logical 
+```
+
+### decoderbufs
+decoderbufs is based on Protobuf and maintained by the Debezium community. [installing](https://github.com/debezium/postgres-decoderbufs) it.
+
+1. To load the plug-in at startup, add the following to the postgresql.conf file:
+
+```properties
+shared_preload_libraries = 'decoderbufs'
+```
+2. To configure the replication slot regardless of the decoder being used, specify the following in the postgresql.conf file:
+
+```properties
+wal_level = logical 
+```
+
+## How to create a PostgreSQL Extract Node
+
+### Usage for SQL API
+
+```sql
+CREATE TABLE `postgresTable`(
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'postgres-cdc',
+  'hostname' = 'localhost',
+  'username' = 'postgres',
+  'password' = 'inlong',
+  'database-name' = 'postgres',
+  'schema-name' = 'public',
+  'port' = '5432',
+  'table-name' = 'user',
+  'decoding.plugin.name' = 'pgoutput',
+  'slot.name' = 'feaafacbaddadc'
+)
+```
+
+### Usage for InLong Dashboard
+
+TODO: It will be supported in the future.
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## PostgreSQL Extract Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify what connector to use, here should be `postgres-cdc`.|
+| hostname | required | (none) | String | IP address or hostname of the PostgreSQL database server. |
+| username | required | (none) | String | Name of the PostgreSQL database to use when connecting to the PostgreSQL database server. |
+| password | required | (none) | String | Password to use when connecting to the PostgreSQL database server. |
+| database-name | required | (none) | String | Database name of the PostgreSQL server to monitor. |
+| schema-name | required | (none) | String | Schema name of the PostgreSQL database to monitor. |
+| table-name | required | (none) | String | Table name of the PostgreSQL database to monitor. |
+| port | optional | 5432 | Integer | Integer port number of the PostgreSQL database server. |
+| decoding.plugin.name | optional | decoderbufs | String | The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput. |
+| slot.name | optional | flink | String | The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring. Slot names must conform to PostgreSQL replication slot naming rules, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character." |
+| debezium.* | optional | (none) | String | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Postgres server. For example: 'debezium.snapshot.mode' = 'never'. See more about the [Debezium's Postgres Connector properties](https://debezium.io/documentation/reference/1.5/connectors/postgresql.html#postgresql-connector-properties). |
+
+**Note**: `slot.name` is recommended to set for different tables to avoid the potential PSQLException: ERROR: replication slot "flink" is active for PID 974 error.  
+**Note**: PSQLException: ERROR: all replication slots are in use Hint: Free one or increase max_replication_slots. We can delete slot by the following statement.
+```sql
+SELECT*FROM pg_replication_slots;
+-- get slot name is flink. delete it
+SELECT pg_drop_replication_slot('flink');
+```
+
+## Available Metadata
+
+The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
+
+| key | Data Type | Description  | 
+|-----|------------|-------------|
+| table_name | STRING NOT NULL | Name of the table that contain the row. |
+| schema_name | STRING NOT NULL | Name of the schema that contain the row. |
+| database_name | STRING NOT NULL | Name of the database that contain the row. |
+| op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0. |
+
+The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
+
+```sql
+CREATE TABLE postgresTable (
+    db_name STRING METADATA FROM 'database_name' VIRTUAL,
+    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
+    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+    `name` STRING,
+    `age` INT
+) WITH (
+     'connector' = 'postgres-cdc',
+     'hostname' = 'localhost',
+     'username' = 'postgres',
+     'password' = 'inlong',
+     'database-name' = 'postgres',
+     'schema-name' = 'public',
+     'port' = '5432',
+     'table-name' = 'user',
+     'decoding.plugin.name' = 'pgoutput',
+     'slot.name' = 'feaafacbaddadc'
+);
+```
+
+## Data Type Mapping
+
+| PostgreSQL type | Flink SQL type |
+|-----------------|----------------|
+|                 | TINYINT        |
+| SMALLINT <br/> INT2 <br/> SMALLSERIAL <br/> SERIAL2 | SMALLINT |
+| INTEGER <br/> SERIAL | INT |
+| BIGINT <br/> BIGSERIAL | BIGINT |
+| | DECIMAL(20, 0) |
+| REAL <br/> FLOAT4 | FLOAT |
+| FLOAT8 <br/> DOUBLE PRECISION| DOUBLE |
+| NUMERIC(p, s) <br/> DECIMAL(p, s) | DECIMAL(p, s) |
+| BOOLEAN | BOOLEAN |
+| DATE | DATE |
+| TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
+| TIMESTAMP [(p)] [WITHOUT TIMEZONE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+| CHAR(n) <br/> CHARACTER(n) <br/> VARCHAR(n) <br/> CHARACTER VARYING(n) <br/> TEXT | STRING |
+| BYTEA | BYTES |
diff --git a/docs/data_node/load_node/clickhouse.md b/docs/data_node/load_node/clickhouse.md
index 8cf4f748f..ac818b5d1 100644
--- a/docs/data_node/load_node/clickhouse.md
+++ b/docs/data_node/load_node/clickhouse.md
@@ -3,10 +3,124 @@ title: ClickHouse
 sidebar_position: 6
 ---
 
-## Overview
-[ClickHouse](https://clickhouse.com/docs/en/intro/)  is a column-oriented database management system (DBMS) for online analytical processing of queries (OLAP).
+## ClickHouse Load Node
+
+The `ClickHouse Load Node` supports to write data into ClickHouse database. This document describes how to set up the ClickHouse Load
+Node to run SQL queries against ClickHouse database.
+
+## Supported Version
+
+| Load Node                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [ClickHouse](./clickhouse.md) | ClickHouse  | ru.yandex.clickhouse | clickhouse-jdbc | [Download](https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc) |
+
+## Dependencies
+
+In order to set up the `ClickHouse Load Node`, the following provides dependency information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## How to create a ClickHouse Load Node
+
+### Usage for SQL API
+
+```sql
+
+-- MySQL extract node
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- ClickHouse load node
+CREATE TABLE `clickhouse_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc-inlong',
+  'dialect-impl' = 'org.apache.inlong.sort.jdbc.dialect.ClickHouseDialect',
+  'url' = 'jdbc:clickhouse://localhost:8123/demo',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- write data into ClickHouse
+INSERT INTO clickhouse_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### Usage for InLong Dashboard
 
-## Configuration
 When creating a data flow, select `ClickHouse` for the data stream direction, and click "Add" to configure it.
 
-![ClickHouse Configuration](img/clickhouse.png)
\ No newline at end of file
+![ClickHouse Configuration](img/clickhouse.png)
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## ClickHouse Load Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify what connector to use, here should be 'jdbc-inlong'. |
+| url | required | (none) | String | The JDBC database url. |
+| dialect-impl | required | (none) |  String | `org.apache.inlong.sort.jdbc.dialect.ClickHouseDialect` |
+| table-name | required | (none) | String | The name of JDBC table to connect. |
+| driver | optional | (none) | String | The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. |
+| username | optional | (none) | String | The JDBC user name. 'username' and 'password' must both be specified if any of them is specified. |
+| password | optional | (none) | String | The JDBC password. |
+| connection.max-retry-timeout | optional | 60s | Duration | Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second. |
+| sink.buffer-flush.max-rows | optional | 100 | Integer | The max size of buffered records before flush. Can be set to zero to disable it. |
+| sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
+| sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
+| sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+
+## Data Type Mapping
+
+| ClickHouse type | Flink SQL type |
+|-----------------|----------------|
+| String          | CHAR           |
+| String <br/> IP <br/> UUID | VARCHAR |
+| String <br/> EnumL | STRING |
+| UInt8 | BOOLEAN |
+| FixedString | BYTES |
+| Decimal <br/> Int128 <br/> Int256 <br/> UInt64 <br/> UInt128 <br/> UInt256 | DECIMAL |
+| Int8 | TINYINT |
+| Int16 <br/> UInt8 | SMALLINT |
+| Int32 <br/> UInt16 <br/> Interval | INTEGER |
+| Int64 <br/> UInt32 | BIGINT |
+| Float32 | FLOAT |
+| Date | DATE |
+| DateTime | TIME |
+| DateTime | TIMESTAMP |
+| DateTime | TIMESTAMP_LTZ |
+| Int32 | INTERVAL_YEAR_MONTH |
+| Int64 | INTERVAL_DAY_TIME |
+| Array | ARRAY |
+| Map | MAP |
+| Not supported | ROW |
+| Not supported | MULTISET |
+| Not supported | RAW |
\ No newline at end of file
diff --git a/docs/data_node/load_node/greenplum.md b/docs/data_node/load_node/greenplum.md
index 79a2493d8..643306fd5 100644
--- a/docs/data_node/load_node/greenplum.md
+++ b/docs/data_node/load_node/greenplum.md
@@ -1,4 +1,116 @@
 ---
 title: Greenplum
 sidebar_position: 9
----
\ No newline at end of file
+---
+
+## Greenplum Load Node
+
+The `Greenplum Load Node` supports to write data into Greenplum database. This document describes how to set up the Greenplum Load
+Node to run SQL queries against Greenplum database.
+
+## Supported Version
+
+| Load Node                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [Greenplum](./greenplum.md) | PostgreSQL  | org.postgresql | postgresql | [Download](https://jdbc.postgresql.org/download.html) |
+
+## Dependencies
+
+In order to set up the `Greenplum Load Node`, the following provides dependency information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## How to create a PostgreSQL Load Node
+
+### Usage for SQL API
+
+```sql
+
+-- MySQL extract node
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- Greenplum load node
+CREATE TABLE `greenplum_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc-inlong',
+  'url' = 'jdbc:postgresql://localhost:5432/write',
+  'dialect-impl' = 'org.apache.inlong.sort.jdbc.dialect.GreenplumDialect',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'public.user'
+)
+
+-- write data into Greenplum
+INSERT INTO greenplum_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### Usage for InLong Dashboard
+
+TODO: It will be supported in the future.
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## Greenplum Load Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify what connector to use, here should be 'jdbc-inlong'. |
+| url | required | (none) | String | The JDBC database url. |
+| dialect-impl | required | (none) |  String | `org.apache.inlong.sort.jdbc.dialect.GreenplumDialect` |
+| table-name | required | (none) | String | The name of JDBC table to connect. |
+| driver | optional | (none) | String | The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. |
+| username | optional | (none) | String | The JDBC user name. 'username' and 'password' must both be specified if any of them is specified. |
+| password | optional | (none) | String | The JDBC password. |
+| connection.max-retry-timeout | optional | 60s | Duration | Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second. |
+| sink.buffer-flush.max-rows | optional | 100 | Integer | The max size of buffered records before flush. Can be set to zero to disable it. |
+| sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
+| sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
+| sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+
+## Data Type Mapping
+
+| Greenplum type | Flink SQL type |
+|-----------------|----------------|
+|                 | TINYINT        |
+| SMALLINT <br/> INT2 <br/> SMALLSERIAL <br/> SERIAL2 | SMALLINT |
+| INTEGER <br/> SERIAL | INT |
+| BIGINT <br/> BIGSERIAL | BIGINT |
+| | DECIMAL(20, 0) |
+| REAL <br/> FLOAT4 | FLOAT |
+| FLOAT8 <br/> DOUBLE PRECISION| DOUBLE |
+| NUMERIC(p, s) <br/> DECIMAL(p, s) | DECIMAL(p, s) |
+| BOOLEAN | BOOLEAN |
+| DATE | DATE |
+| TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
+| TIMESTAMP [(p)] [WITHOUT TIMEZONE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+| CHAR(n) <br/> CHARACTER(n) <br/> VARCHAR(n) <br/> CHARACTER VARYING(n) <br/> TEXT | STRING |
+| BYTEA | BYTES |
\ No newline at end of file
diff --git a/docs/data_node/load_node/hbase.md b/docs/data_node/load_node/hbase.md
index c5d5d973d..fbde8fe13 100644
--- a/docs/data_node/load_node/hbase.md
+++ b/docs/data_node/load_node/hbase.md
@@ -1,4 +1,125 @@
 ---
 title: HBase
 sidebar_position: 10
----
\ No newline at end of file
+---
+
+## HBase Load Node
+
+The `HBase Load Node` supports to write data into HBase database.
+
+## Supported Version
+
+| Load Node                | HBase version |                                                                                                                                                                                                                                                                                                                                                                                           
+|-----------------------------|---------------|
+| [HBase](./hbase.md)         | 2.2.x     |  
+
+## Dependencies
+
+In order to set up the `HBase Load Node`, the following provides dependency information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-hbase</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+## How to create a HBase Load Node
+
+### Usage for SQL API
+
+All the column families in HBase table must be declared as ROW type, the field name maps to the column family name, and 
+the nested field names map to the column qualifier names. There is no need to declare all the families and qualifiers in 
+the schema, users can declare what’s used in the query. Except the ROW type fields, the single atomic type field 
+(e.g. STRING, BIGINT)will be recognized as HBase rowkey. The rowkey field can be arbitrary name, but should be quoted 
+using backticks if it is a reserved keyword.
+
+The example below shows how to create a HBase Load Node with `Flink SQL` :
+
+```sql
+-- Create a HBase table 'hbase_load_node' in Flink SQL
+CREATE TABLE hbase_load_node (
+    rowkey STRING,
+    family1 ROW<q1 INT>,
+    family2 ROW<q2 STRING, q3 BIGINT>,
+    family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
+    PRIMARY KEY (rowkey) NOT ENFORCED
+) WITH (
+      'connector' = 'hbase-2.2',
+      'table-name' = 'mytable',
+      'zookeeper.quorum' = 'localhost:2181'
+);
+
+-- use ROW(...) construction function construct column families and write data into the HBase table.
+-- assuming the schema of "T" is [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
+INSERT INTO hTable
+SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
+
+-- scan data from the HBase table
+SELECT rowkey, family1, family3.q4, family3.q6 FROM hTable;
+
+-- temporal join the HBase table as a dimension table
+SELECT * FROM myTopic
+LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime
+ON myTopic.key = hTable.rowkey;
+```
+
+### Usage for InLong Dashboard
+
+TODO: It will be supported in the future.
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## HBase Load Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify what connector to use, valid values are: hbase-2.2: connect to HBase 2.2.x cluster |
+| table-name | required | (none) | String | The name of HBase table to connect. |
+| zookeeper.quorum | required | (none) | String | The HBase Zookeeper quorum. |
+| zookeeper.znode.parent | optional | /hbase | String | The root dir in Zookeeper for HBase cluster. |
+| null-string-literal | optional | null | String | Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type. |
+| sink.buffer-flush.max-size | optional | 2mb | MemorySize | Writing option, maximum size in memory of buffered rows for each writing request. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to '0' to disable it. |
+| sink.buffer-flush.max-rows | optional | 1000 | Integer | Writing option, maximum number of rows to buffer for each writing request. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to '0' to disable it. |
+| sink.buffer-flush.interval | optional | 1s | Duration | Writing option, the interval to flush any buffered rows. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
+| sink.parallelism | optional | (none) | Integer | Defines the parallelism of the HBase sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+| lookup.async | optional | false | Boolean | Whether async lookup are enabled. If true, the lookup will be async. Note, async only supports hbase-2.2 connector. |
+| lookup.cache.max-rows | optional | (none) | Integer | The max number of rows of lookup cache, over this value, the oldest rows will be expired. Note, "lookup.cache.max-rows" and "lookup.cache.ttl" options must all be specified if any of them is specified. Lookup cache is disabled by default. |
+| lookup.cache.ttl | optional | (none) | Duration | The max time to live for each rows in lookup cache, over this time, the oldest rows will be expired. Note, "cache.max-rows" and "cache.ttl" options must all be specified if any of them is specified.Lookup cache is disabled by default. |
+| lookup.max-retries | optional | 3 | Integer | The max retry times if lookup database failed. |
+| properties.* | optional | (none) | String | This can set and pass arbitrary HBase configurations. Suffix names must match the configuration key defined in [HBase Configuration documentation](https://hbase.apache.org/2.3/book.html#hbase_default_configurations). Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying HBaseClient. For example, you can add a kerberos authentication parameter 'properties.hbase.security.authentication' = 'kerb [...]
+
+## Data Type Mapping
+
+HBase stores all data as byte arrays. The data needs to be serialized and deserialized during read and write operation.
+
+When serializing and de-serializing, Flink HBase connector uses utility class org.apache.hadoop.hbase.util.Bytes provided by HBase (Hadoop) to convert Flink Data Types to and from byte arrays.
+
+Flink HBase connector encodes null values to empty bytes, and decode empty bytes to null values for all data types except string type. For string type, the null literal is determined by null-string-literal option.
+
+The data type mappings are as follows:
+
+| Flink SQL type | HBase conversion |
+|-----------------|-----------------|
+| CHAR <br/> VARCHAR <br/> STRING | byte[] toBytes(String s) <br/> String toString(byte[] b) |
+| BOOLEAN | byte[] toBytes(boolean b) <br/> boolean toBoolean(byte[] b) |
+| BINARY <br/> VARBINARY | Returns byte[] as is. |
+| DECIMAL | byte[] toBytes(BigDecimal v) <br/> BigDecimal toBigDecimal(byte[] b) |
+| TINYINT | new byte[] { val } <br/> bytes[0] // returns first and only byte from bytes |
+| SMALLINT | byte[] toBytes(short val) <br/> short toShort(byte[] bytes) |
+| INT | byte[] toBytes(int val) <br/> int toInt(byte[] bytes) |
+| BIGINT | byte[] toBytes(long val) <br/> long toLong(byte[] bytes) |
+| FLOAT | byte[] toBytes(float val) <br/> float toFloat(byte[] bytes) |
+| DOUBLE | byte[] toBytes(double val) <br/> double toDouble(byte[] bytes) |
+| DATE | Stores the number of days since epoch as int value. |
+| TIME | Stores the number of milliseconds of the day as int value. |
+| TIMESTAMP | Stores the milliseconds since epoch as long value. |
+| ARRAY | Not supported |
+| MAP <br/> MULTISET | Not supported |
+| ROW | Not supported |
\ No newline at end of file
diff --git a/docs/data_node/load_node/kafka.md b/docs/data_node/load_node/kafka.md
index 5094d92ea..3add072a9 100644
--- a/docs/data_node/load_node/kafka.md
+++ b/docs/data_node/load_node/kafka.md
@@ -3,7 +3,103 @@ title: Kafka
 sidebar_position: 5
 ---
 
-## Configuration
+## Kafka Load Node
+
+The `Kafka Load Node` supports to write data into Kafka topics. It can support to write data in the normal fashion and write data in the
+upsert fashion. The `upsert-kafka` connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as 
+normal Kafka messages value, and write DELETE data as Kafka messages with null values (indicate tombstone for the key).
+
+## Supported Version
+
+| Load Node                | Kafka version |                                                                                                                                                                                                                                                                                                                                                                                           
+|--------------------------|---------------|
+| [Kafka](./kafka.md)      | 0.10+         |  
+
+## Dependencies
+
+In order to set up the `Kafka Load Node`, the following provides dependency information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-kafka</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## How to create a Kafka Load Node
+
+### Usage for SQL API
+
+The example below shows how to create a Kafka Load Node with `Flink SQL` :
+* connector is `kafka-inlong`
+```sql
+-- Create a Kafka table 'kafka_load_node' in Flink SQL
+Flink SQL> CREATE TABLE kafka_load_node (
+           `id` INT,
+           `name` STRINTG
+           ) WITH (
+           'connector' = 'kafka-inlong',
+           'topic' = 'user',
+           'properties.bootstrap.servers' = 'localhost:9092',
+           'properties.group.id' = 'testGroup',
+           'format' = 'csv'
+           )
+```
+* connector is `upsert-kafka`
+```sql
+-- Create a Kafka table 'kafka_load_node' in Flink SQL
+Flink SQL> CREATE TABLE kafka_load_node (
+          `id` INT,
+          `name` STRINTG,
+           PRIMARY KEY (`id`) NOT ENFORCED
+          ) WITH (
+          'connector' = 'upsert-kafka',
+          'topic' = 'user',
+          'properties.bootstrap.servers' = 'localhost:9092',
+          'key.format' = 'csv',
+          'value.format' = 'csv'
+          )   
+```
+### Usage for InLong Dashboard
+
 When creating a data flow, select `Kafka` for the data stream direction, and click "Add" to configure it.
 
-![Kafka Configuration](img/kafka.png)
\ No newline at end of file
+![Kafka Configuration](img/kafka.png)
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## Kafka Load Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify which connector to use, valid values are:  1. for the Upsert Kafka use: `upsert-kafka`  2. for normal Kafka use: `kafka-inlong` |
+| topic | required | (none) | String | Topic name(s) to read data from when the table is used as source. It also supports  topic list for source by separating topic by semicolon like `topic-1;topic-2`. Note, only one of `topic-pattern` and `topic` can be specified for sources. |
+| properties.bootstrap.servers | required | (none) | String | Comma separated list of Kafka brokers. |
+| properties.* | optional | (none) | String | This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in [Kafka Configuration documentation](https://kafka.apache.org/documentation/#configuration). Flink will remove the `properties.` key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via `properties.allow.auto.create.topics` = `false`. But there are some [...]
+| format | required for normal Kafka | (none) | String | The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more [format](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/) options. Note: Either this option or the `value.format` option are required. |
+| key.format | optional | (none) | String | The format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the 'key.fields' option is required as well. Otherwise the Kafka records will have an empty key. |
+| key.fields | optional | [] | `List<String>` | Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like 'field1;field2'. |
+| key.fields-prefix | optional | (none) | String | Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and 'key.fields' will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that 'value.fiel [...]
+| value.format | required for upsert Kafka | (none) | String | The [format](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/) used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. |
+| value.fields-include | optional | ALL | Enum Possible values: [ALL, EXCEPT_KEY]| Defines a strategy how to deal with key columns in the data type of the value format. By default, 'ALL' physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format |
+| sink.partitioner | optional | 'default' | String | Output partitioning from Flink's partitions into Kafka's partitions. Valid values are <br/>`default`: use the kafka default partitioner to partition records. <br/>`fixed`: each Flink partition ends up in at most one Kafka partition. <br/>`round-robin`: a Flink partition is distributed to Kafka partitions sticky round-robin. It only works when record's keys are not specified. Custom FlinkKafkaPartitioner subclass: e.g. 'org.mycompany.My [...]
+| sink.semantic | optional | at-least-once | String | Defines the delivery semantic for the Kafka sink. Valid enumerationns are 'at-least-once', 'exactly-once' and 'none'. See [Consistency guarantees](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#consistency-guarantees) for more details. |
+| sink.parallelism | optional | (none) | Integer | Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+
+## Available Metadata
+
+It supports write metadata for format `canal-json-inlong`.
+
+See the [Kafka Extract Node](../extract_node/kafka.md) for a list of all available metadata fields.
+
+## Data Type Mapping
+
+Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to [Formats](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/) pages for more details.
+
diff --git a/docs/data_node/load_node/mysql.md b/docs/data_node/load_node/mysql.md
index 813cfcb29..8a7bf3962 100644
--- a/docs/data_node/load_node/mysql.md
+++ b/docs/data_node/load_node/mysql.md
@@ -1,4 +1,117 @@
 ---
 title: MySQL
 sidebar_position: 12
----
\ No newline at end of file
+---
+
+## MySQL Load Node
+
+The `MySQL Load Node` supports to write data into MySQL database. This document describes how to set up the MySQL Load 
+Node to run SQL queries against MySQL database.
+
+## Supported Version
+
+| Load Node                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [MySQL](./mysql.md)      | MySQL  | mysql    | mysql-connector-java | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+
+## Dependencies
+
+In order to set up the `MySQL Load Node`, the following provides dependency information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+MySQL license is conflict with Inlong license. So We remove MySQL driver in pom.xml. User can modify pom.xml before maven 
+packaging if User need use it.
+
+## How to create a MySQL Load Node
+
+### Usage for SQL API
+
+```sql
+
+-- MySQL extract node
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- MySQL load node
+CREATE TABLE `mysql_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/write',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- write data into mysql
+INSERT INTO mysql_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### Usage for InLong Dashboard
+
+TODO: It will be supported in the future.
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## MySQL Load Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify what connector to use, here should be 'jdbc-inlong'. |
+| url | required | (none) | String | The JDBC database url. |
+| table-name | required | (none) | String | The name of JDBC table to connect. |
+| driver | optional | (none) | String | The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. |
+| username | optional | (none) | String | The JDBC user name. 'username' and 'password' must both be specified if any of them is specified. |
+| password | optional | (none) | String | The JDBC password. |
+| connection.max-retry-timeout | optional | 60s | Duration | Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second. |
+| sink.buffer-flush.max-rows | optional | 100 | Integer | The max size of buffered records before flush. Can be set to zero to disable it. |
+| sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
+| sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
+| sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+
+## Data Type Mapping
+
+| MySQL type | Flink SQL type |
+|------------|----------------|
+| TINYINT | TINYINT |
+| SMALLINT <br/> TINYINT UNSIGNED| SMALLINT |
+| INT <br/> MEDIUMINT <br/> SMALLINT UNSIGNED | INT |
+| BIGINT <br/> INT UNSIGNED | BIGINT |
+| BIGINT UNSIGNED | DECIMAL(20, 0) |
+| FLOAT | FLOAT |
+| DOUBLE <br/> DOUBLE PRECISION | DOUBLE |
+| NUMERIC(p, s) <br/> DECIMAL(p, s) | DECIMAL(p, s) |
+| BOOLEAN <br/> TINYINT(1) | BOOLEAN |
+| DATE | DATE |
+| TIME [(p)] | TIME [(p)] [WITHOUT TIMEZONE] |
+| DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+| CHAR(n) <br/> VARCHAR(n) <br/> TEXT | STRING |
+| BINARY <br/> VARBINARY <br/> BLOB | BYTES |
+|  |  ARRAY |
\ No newline at end of file
diff --git a/docs/data_node/load_node/oracle.md b/docs/data_node/load_node/oracle.md
index d08324620..86cc536d2 100644
--- a/docs/data_node/load_node/oracle.md
+++ b/docs/data_node/load_node/oracle.md
@@ -1,4 +1,116 @@
 ---
 title: Oracle
 sidebar_position: 13
----
\ No newline at end of file
+---
+
+## Oracle Load Node
+
+The `Oracle Load Node` supports to write data into Oracle database. This document describes how to set up the Oracle Load
+Node to run SQL queries against Oracle database.
+
+## Supported Version
+
+| Load Node                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [Oracle](./oracle.md) |  Oracle | com.oracle.database.jdbc | ojdbc8 | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
+
+## Dependencies
+
+In order to set up the `Oracle Load Node`, the following provides dependency information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+Oracle license is conflict with Inlong license. So We remove Oracle driver in pom.xml. User can modify pom.xml before maven
+packaging if User need use it.
+
+## How to create an Oracle Load Node
+
+### Usage for SQL API
+
+```sql
+
+-- MySQL extract node
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- Oracle load node
+CREATE TABLE `oracle_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc-inlong',
+  'url' = 'jdbc:oracle:thin://host:port/database/',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'public.user'
+)
+
+-- write data into Oracle
+INSERT INTO oracle_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### Usage for InLong Dashboard
+
+TODO: It will be supported in the future.
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## Oracle Load Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify what connector to use, here should be 'jdbc-inlong'. |
+| url | required | (none) | String | The JDBC database url. |
+| table-name | required | (none) | String | The name of JDBC table to connect. |
+| driver | optional | (none) | String | The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. |
+| username | optional | (none) | String | The JDBC user name. 'username' and 'password' must both be specified if any of them is specified. |
+| password | optional | (none) | String | The JDBC password. |
+| connection.max-retry-timeout | optional | 60s | Duration | Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second. |
+| sink.buffer-flush.max-rows | optional | 100 | Integer | The max size of buffered records before flush. Can be set to zero to disable it. |
+| sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
+| sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
+| sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+
+## Data Type Mapping
+
+| Oracle type | Flink SQL type |
+|-----------------|----------------|
+| BINARY_FLOAT    | FLOAT        |
+| BINARY_DOUBLE   | DOUBLE |
+| SMALLINT <br/> FLOAT(s) <br/> DOUBLE PRECISION <br/> REAL <br/> NUMBER(p, s) | DECIMAL(p, s) |
+| DATE | DATE |
+| | DECIMAL(20, 0) |
+| REAL <br/> FLOAT4 | FLOAT |
+| FLOAT8 <br/> DOUBLE PRECISION| DOUBLE |
+| NUMERIC(p, s) <br/> DECIMAL(p, s) | DECIMAL(p, s) |
+| BOOLEAN | BOOLEAN |
+| DATE | DATE |
+| TIMESTAMP [(p)] [WITHOUT TIMEZONE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+| CHAR(n) <br/> VARCHAR(n) <br/> CLOB(n) | STRING |
+| RAW(s) <br/> BLOB | BYTES |
+|  | ARRAY |
\ No newline at end of file
diff --git a/docs/data_node/load_node/overview.md b/docs/data_node/load_node/overview.md
index 58fb9dd83..ae7c9549e 100644
--- a/docs/data_node/load_node/overview.md
+++ b/docs/data_node/load_node/overview.md
@@ -10,19 +10,19 @@ Load Nodes is a set of Sink Connectors based on <a href="https://flink.apache.or
 ## Supported Load Nodes
 | Load Node                               | Version                                                                                                                                                                                                                                                                                                                                                                                                | Driver                  |
 |-----------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------|
-| [kafka](kafka.md)                       | [Kafka](https://kafka.apache.org/): 0.10+                                                                                                                                                                                                                                                                                                                                                              | None                    |
-| [hbase](hbase.md)                       | [Hbase](https://hbase.apache.org/): 2.2.x                                                                                                                                                                                                                                                                                                                                                              | None                    |
-| [postgresql](postgresql.md)             | [PostgreSQL](https://www.postgresql.org/): 9.6, 10, 11, 12                                                                                                                                                                                                                                                                                                                                             | None                    |
-| [oracle](oracle.md)                     | [Oracle](https://www.oracle.com/index.html): 11, 12, 19                                                                                                                                                                                                                                                                                                                                                | Oracle Driver: 19.3.0.0 |
-| [mysql](mysql.md)                       | [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <br/> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <br/> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <br/> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <br/> [MariaDB](https://mariadb.org): 10.x <br/> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.21     |
-| [tdsql-postgresql](tdsql-postgresql.md) | [TDSQL-PostgreSQL](https://cloud.tencent.com/document/product/1129): 10.17                                                                                                                                                                                                                                                                                                                             | Oracle Driver: 19.3.0.0 |
-| [greenplum](greenplum.md)               | [Greeplum](https://greenplum.org/): 4.x, 5.x, 6.x                                                                                                                                                                                                                                                                                                                                                      | None                    |
-| [elasticsearch](elasticsearch.md)       | [Elasticsearch](https://www.elastic.co/): 6.x, 7.x                                                                                                                                                                                                                                                                                                                                                     | None                    |
-| [clickhouse](clickhouse.md)             | [ClickHouse](https://clickhouse.com/): 20.7+                                                                                                                                                                                                                                                                                                                                                           | None                    |
-| [hive](hive.md)                         | [Hive](https://hive.apache.org/): 1.x, 2.x, 3.x                                                                                                                                                                                                                                                                                                                                                        | None                    |
-| [sqlserver](sqlserver.md)               | [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019                                                                                                                                                                                                                                                                                                                        | None                    |
-| [hdfs](hdfs.md)                         | [HDFS](https://hadoop.apache.org/): 2.x, 3.x                                                                                                                                                                                                                                                                                                                                                           | None                    |
-| [iceberg](iceberg.md)                   | [Iceberg](https://iceberg.apache.org/): 0.13.1+                                                                                                                                                                                                                                                                                                                                                        | None                    |
+| [Kafka](kafka.md)                       | [Kafka](https://kafka.apache.org/): 0.10+                                                                                                                                                                                                                                                                                                                                                              | None                    |
+| [HBase](hbase.md)                       | [HBase](https://hbase.apache.org/): 2.2.x                                                                                                                                                                                                                                                                                                                                                              | None                    |
+| [PostgreSQL](postgresql.md)             | [PostgreSQL](https://www.postgresql.org/): 9.6, 10, 11, 12                                                                                                                                                                                                                                                                                                                                             | JDBC Driver: 42.2.12    |
+| [Oracle](oracle.md)                     | [Oracle](https://www.oracle.com/index.html): 11, 12, 19                                                                                                                                                                                                                                                                                                                                                | Oracle Driver: 19.3.0.0 |
+| [MySQL](mysql.md)                       | [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <br/> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <br/> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <br/> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <br/> [MariaDB](https://mariadb.org): 10.x <br/> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.21     |
+| [TDSQL-PostgreSQL](tdsql-postgresql.md) | [TDSQL-PostgreSQL](https://cloud.tencent.com/document/product/1129): 10.17                                                                                                                                                                                                                                                                                                                             | JDBC Driver: 42.2.12    |
+| [Greenplum](greenplum.md)               | [Greenplum](https://greenplum.org/): 4.x, 5.x, 6.x                                                                                                                                                                                                                                                                                                                                                     | JDBC Driver: 42.2.12    |
+| [Elasticsearch](elasticsearch.md)       | [Elasticsearch](https://www.elastic.co/): 6.x, 7.x                                                                                                                                                                                                                                                                                                                                                     | None                    |
+| [ClickHouse](clickhouse.md)             | [ClickHouse](https://clickhouse.com/): 20.7+                                                                                                                                                                                                                                                                                                                                                           | JDBC Driver: 0.3.1      |
+| [Hive](hive.md)                         | [Hive](https://hive.apache.org/): 1.x, 2.x, 3.x                                                                                                                                                                                                                                                                                                                                                        | None                    |
+| [SQLServer](sqlserver.md)               | [SQLServer](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019                                                                                                                                                                                                                                                                                                                        | JDBC Driver: 7.2.2.jre8 |
+| [HDFS](hdfs.md)                         | [HDFS](https://hadoop.apache.org/): 2.x, 3.x                                                                                                                                                                                                                                                                                                                                                           | None                    |
+| [Iceberg](iceberg.md)                   | [Iceberg](https://iceberg.apache.org/): 0.13.1+                                                                                                                                                                                                                                                                                                                                                        | None                    |
 
 
 ## Supported Flink Versions
diff --git a/docs/data_node/load_node/postgresql.md b/docs/data_node/load_node/postgresql.md
index bcdacf590..eb1a712c0 100644
--- a/docs/data_node/load_node/postgresql.md
+++ b/docs/data_node/load_node/postgresql.md
@@ -1,4 +1,114 @@
 ---
 title: PostgreSQL
 sidebar_position: 14
----
\ No newline at end of file
+---
+
+## PostgreSQL Load Node
+
+The `PostgreSQL Load Node` supports to write data into PostgreSQL database. This document describes how to set up the PostgreSQL Load
+Node to run SQL queries against PostgreSQL database.
+
+## Supported Version
+
+| Load Node                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [PostgreSQL](./postgresql.md) | PostgreSQL  | org.postgresql | postgresql | [Download](https://jdbc.postgresql.org/download.html) |
+
+## Dependencies
+
+In order to set up the `PostgreSQL Load Node`, the following provides dependency information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## How to create a PostgreSQL Load Node
+
+### Usage for SQL API
+
+```sql
+
+-- MySQL extract node
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- PostgreSQL load node
+CREATE TABLE `postgresql_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc',
+  'url' = 'jdbc:postgresql://localhost:5432/write',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'public.user'
+)
+
+-- write data into postgresql
+INSERT INTO postgresql_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### Usage for InLong Dashboard
+
+TODO: It will be supported in the future.
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## PostgreSQL Load Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify what connector to use, here should be 'jdbc'. |
+| url | required | (none) | String | The JDBC database url. |
+| table-name | required | (none) | String | The name of JDBC table to connect. |
+| driver | optional | (none) | String | The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. |
+| username | optional | (none) | String | The JDBC user name. 'username' and 'password' must both be specified if any of them is specified. |
+| password | optional | (none) | String | The JDBC password. |
+| connection.max-retry-timeout | optional | 60s | Duration | Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second. |
+| sink.buffer-flush.max-rows | optional | 100 | Integer | The max size of buffered records before flush. Can be set to zero to disable it. |
+| sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
+| sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
+| sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+
+## Data Type Mapping
+
+| PostgreSQL type | Flink SQL type |
+|-----------------|----------------|
+|                 | TINYINT        |
+| SMALLINT <br/> INT2 <br/> SMALLSERIAL <br/> SERIAL2 | SMALLINT |
+| INTEGER <br/> SERIAL | INT |
+| BIGINT <br/> BIGSERIAL | BIGINT |
+| | DECIMAL(20, 0) |
+| REAL <br/> FLOAT4 | FLOAT |
+| FLOAT8 <br/> DOUBLE PRECISION| DOUBLE |
+| NUMERIC(p, s) <br/> DECIMAL(p, s) | DECIMAL(p, s) |
+| BOOLEAN | BOOLEAN |
+| DATE | DATE |
+| TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
+| TIMESTAMP [(p)] [WITHOUT TIMEZONE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+| CHAR(n) <br/> CHARACTER(n) <br/> VARCHAR(n) <br/> CHARACTER VARYING(n) <br/> TEXT | STRING |
+| BYTEA | BYTES |
\ No newline at end of file
diff --git a/docs/data_node/load_node/sqlserver.md b/docs/data_node/load_node/sqlserver.md
index d44335d9b..99ef360f4 100644
--- a/docs/data_node/load_node/sqlserver.md
+++ b/docs/data_node/load_node/sqlserver.md
@@ -1,4 +1,115 @@
 ---
-title: SqlServer
+title: SQLServer
 sidebar_position: 15
----
\ No newline at end of file
+---
+
+## SQLServer Load Node
+
+The `SQLServer Load Node` supports to write data into SQLServer database. This document describes how to set up the SQLServer Load
+Node to run SQL queries against SQLServer database.
+
+## Supported Version
+
+| Load Node                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [SQLServer](./sqlserver.md) | SQL Server  | com.microsoft.sqlserver | mssql-jdbc | [Download](https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc) |
+
+## Dependencies
+
+In order to set up the `SQLServer Load Node`, the following provides dependency information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## How to create an SQLServer Load Node
+
+### Usage for SQL API
+
+```sql
+
+-- MySQL extract node
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- SQLServer load node
+CREATE TABLE `sqlserver_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc-inlong',
+  'url' = 'jdbc:sqlserver://localhost:1433;databaseName=column_type_test',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'dbo.work1'
+)
+
+-- write data into SQLServer
+INSERT INTO sqlserver_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### Usage for InLong Dashboard
+
+TODO: It will be supported in the future.
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## SQLServer Load Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify what connector to use, here should be 'jdbc-inlong'. |
+| url | required | (none) | String | The JDBC database url. |
+| table-name | required | (none) | String | The name of JDBC table to connect. |
+| driver | optional | (none) | String | The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. |
+| username | optional | (none) | String | The JDBC user name. 'username' and 'password' must both be specified if any of them is specified. |
+| password | optional | (none) | String | The JDBC password. |
+| connection.max-retry-timeout | optional | 60s | Duration | Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second. |
+| sink.buffer-flush.max-rows | optional | 100 | Integer | The max size of buffered records before flush. Can be set to zero to disable it. |
+| sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
+| sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
+| sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+
+## Data Type Mapping
+
+| SQLServer type | Flink SQL type |
+|----------------|----------------|
+| char(n)       | CHAR(n)        |
+| varchar(n) <br/> nvarchar(n) <br/> nchar(n) | VARCHAR(n) |
+| text <br/> ntext <br/> xml | STRING |
+| BIGINT <br/> BIGSERIAL | BIGINT |
+| decimal(p, s) <br/> money <br/> smallmoney | DECIMAL(p, s) |
+| numeric  | NUMERIC |
+| float <br/> real | FLOAT |
+| bit | BOOLEAN |
+| int | INT |
+| tinyint | TINYINT |
+| smallint | SMALLINT |
+| bigint | BIGINT |
+| time(n) | TIME(n) |
+| datetime2 <br/> datetime <br/> smalldatetime | TIMESTAMP(n) |
+| datetimeoffset | TIMESTAMP_LTZ(3) |
\ No newline at end of file
diff --git a/docs/data_node/load_node/tdsql-postgresql.md b/docs/data_node/load_node/tdsql-postgresql.md
index 35bef838b..30a0ce7f8 100644
--- a/docs/data_node/load_node/tdsql-postgresql.md
+++ b/docs/data_node/load_node/tdsql-postgresql.md
@@ -1,4 +1,114 @@
 ---
 title: TDSQL-PostgreSQL
 sidebar_position: 16
----
\ No newline at end of file
+---
+
+## TDSQL-PostgreSQL Load Node
+
+The `TDSQL-PostgreSQL Load Node` supports to write data into TDSQL-PostgreSQL database. This document describes how to set up the TDSQL-PostgreSQL Load
+Node to run SQL queries against TDSQL-PostgreSQL database.
+
+## Supported Version
+
+| Load Node                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [TDSQL-PostgreSQL](./tdsql-postgresql.md) | PostgreSQL  | org.postgresql | postgresql | [Download](https://jdbc.postgresql.org/download.html) |
+
+## Dependencies
+
+In order to set up the `TDSQL-PostgreSQL Load Node`, the following provides dependency information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## How to create a TDSQL-PostgreSQL Load Node
+
+### Usage for SQL API
+
+```sql
+
+-- MySQL extract node
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- TDSQL-PostgreSQL load node
+CREATE TABLE `tdsql_postgresql_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc-inlong',
+  'url' = 'jdbc:postgresql://localhost:5432/write',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'public.user'
+)
+
+-- write data into TDSQL-PostgreSQL
+INSERT INTO tdsql_postgresql_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### Usage for InLong Dashboard
+
+TODO: It will be supported in the future.
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## TDSQL-PostgreSQL Load Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify what connector to use, here should be 'jdbc-inlong'. |
+| url | required | (none) | String | The JDBC database url. |
+| table-name | required | (none) | String | The name of JDBC table to connect. |
+| driver | optional | (none) | String | The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. |
+| username | optional | (none) | String | The JDBC user name. 'username' and 'password' must both be specified if any of them is specified. |
+| password | optional | (none) | String | The JDBC password. |
+| connection.max-retry-timeout | optional | 60s | Duration | Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second. |
+| sink.buffer-flush.max-rows | optional | 100 | Integer | The max size of buffered records before flush. Can be set to zero to disable it. |
+| sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
+| sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
+| sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+
+## Data Type Mapping
+
+| TDSQL-PostgreSQL type | Flink SQL type |
+|-----------------|----------------|
+|                 | TINYINT        |
+| SMALLINT <br/> INT2 <br/> SMALLSERIAL <br/> SERIAL2 | SMALLINT |
+| INTEGER <br/> SERIAL | INT |
+| BIGINT <br/> BIGSERIAL | BIGINT |
+| | DECIMAL(20, 0) |
+| REAL <br/> FLOAT4 | FLOAT |
+| FLOAT8 <br/> DOUBLE PRECISION| DOUBLE |
+| NUMERIC(p, s) <br/> DECIMAL(p, s) | DECIMAL(p, s) |
+| BOOLEAN | BOOLEAN |
+| DATE | DATE |
+| TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
+| TIMESTAMP [(p)] [WITHOUT TIMEZONE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+| CHAR(n) <br/> CHARACTER(n) <br/> VARCHAR(n) <br/> CHARACTER VARYING(n) <br/> TEXT | STRING |
+| BYTEA | BYTES |
\ No newline at end of file
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/kafka.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/kafka.md
index 4ead68435..bf0363a52 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/kafka.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/kafka.md
@@ -2,7 +2,156 @@
 title: Kafka
 sidebar_position: 4
 ---
+## Kafka Extract 节点
 
-## 配置
-当前版本 Dashboard 还不支持从 Kafka 中抽取数据,
-你可以通过 [Command-line Tools](user_guide/command_line_tools.md) 命令行工具从后台创建 Kafka 数据流。
\ No newline at end of file
+`Kafka Extract` 节点 支持从 Kafka topics 中读取数据。它支持以普通的方式读取数据和 Upsert 的方式读取数据。`upsert-kafka` 连接器生产 `changelog` 流,
+其中每条数据记录代表一个更新或删除事件。`kafka-inlong` 连接器可以以普通方式读取数据和元数据信息。
+
+## 支持的版本
+
+| Extract 节点                | Kafka 版本 |                                                                                                                                                                                                                                                                                                                                                                                           
+|-----------------------------|-----------|
+| [Kafka](./kafka.md)         | 0.10+     |  
+
+## 依赖
+
+为了设置 Kafka Extract 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。
+
+### Maven 依赖
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-kafka</artifactId>
+    <!-- 选择适合你的 Application 的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## 如何创建 Kafka Extract 节点
+
+### SQL API 用法
+
+下面这个例子展示了如何用 `Flink SQL` 创建一个 Kafka Extract 节点:
+* 连接器是 `kafka-inlong`
+```sql
+-- 设置 Checkpoint 为 3000 毫秒                      
+Flink SQL> SET 'execution.checkpointing.interval' = '3s';   
+
+-- 使用 Flink SQL 创建 Kafka 表 'kafka_extract_node'
+Flink SQL> CREATE TABLE kafka_extract_node (
+           `id` INT,
+           `name` STRINTG
+           ) WITH (
+           'connector' = 'kafka-inlong',
+           'topic' = 'user',
+           'properties.bootstrap.servers' = 'localhost:9092',
+           'properties.group.id' = 'testGroup',
+           'scan.startup.mode' = 'earliest-offset',
+           'format' = 'csv'
+           )
+  
+-- 读取数据
+Flink SQL> SELECT * FROM kafka_extract_node;
+```
+* 连接器是 `upsert-kafka`
+```sql
+-- 设置 Checkpoint 为 3000 毫秒                       
+Flink SQL> SET 'execution.checkpointing.interval' = '3s';
+
+-- 使用 Flink SQL 创建 Kafka 表 'kafka_extract_node'
+Flink SQL> CREATE TABLE kafka_extract_node (
+          `id` INT,
+          `name` STRINTG,
+           PRIMARY KEY (`id`) NOT ENFORCED
+          ) WITH (
+          'connector' = 'upsert-kafka',
+          'topic' = 'user',
+          'properties.bootstrap.servers' = 'localhost:9092',
+          'properties.group.id' = 'testGroup',
+          'scan.startup.mode' = 'earliest-offset',
+          'key.format' = 'csv',
+          'value.format' = 'csv'
+          )
+    
+-- 读取数据
+Flink SQL> SELECT * FROM kafka_extract_node;       
+```
+### InLong Dashboard 用法
+
+TODO: 将在未来支持此功能。
+
+### InLong Manager Client 用法
+
+TODO: 将在未来支持此功能。
+
+## Kafka Extract 节点参数
+
+| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
+|---------|----------|---------|------|------------|
+| connector | 必选 | (none) | String | 指定要使用的连接器  1. Upsert Kafka 连接器使用: `upsert-kafka`  2. Kafka连接器使用: `kafka-inlong` |
+| topic | 可选 | (none) | String | 当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 `topic-1;topic-2`。注意,对 source 表而言,`topic` 和 `topic-pattern` 两个选项只能使用其中一个。 |
+| topic-pattern | 可选 | (none) | String | 匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,`topic` 和 `topic-pattern` 两个选项只能使用其中一个。 |
+| properties.bootstrap.servers | 必选 | (none) | String | 逗号分隔的 Kafka broker 列表。 |
+| properties.group.id | 必选 | (none) | String | Kafka source 的消费组 id。 |
+| properties.* | 可选 | (none) | String | 可以设置和传递任意 Kafka 的配置项。后缀名必须匹配在 [Kafka 配置文档](https://kafka.apache.org/documentation/#configuration) 中定义的配置键。Flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用 topic 的自动创建。但是某些配置项不支持进行配置,因为 Flink 会覆盖这些配置,例如 'key.deserializer' 和 'value.deserializer'。 |
+| format | 对于 Kafka 必选 | (none) | String | 用来序列化或反序列化 Kafka 消息的格式。 请参阅 [格式](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/) 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 'value.format' 二者必需其一。 |
+| key.format | 可选 | (none) | String | 用来序列化和反序列化 Kafka 消息键(Key)的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:如果定义了键格式,则配置项 'key.fields' 也是必需的。 否则 Kafka 记录将使用空值作为键。 |
+| key.fields | 可选 | [] | `List<String>` | 表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为 'field1;field2'。 |
+| key.fields-prefix | 可选 | (none) | String | 为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 'key.fields' 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 'value.fields-include' 配置为 'EXCEPT_KEY'。 |
+| value.format | 对于 Upsert Kafka 必选 | (none) | String | 用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 'csv'、'json'、'avro'。请参考[格式](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/overview/)页面以获取更多详细信息和格式参数。 |
+| value.fields-include | 可选 | ALL | String | 控制哪些字段应该出现在 value 中。可取值:<br/> ALL:消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。<br/> EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。 |
+| scan.startup.mode | 可选 | group-offsets | String | Kafka consumer 的启动模式。有效值为:'earliest-offset','latest-offset','group-offsets','timestamp' 和 'specific-offsets'。 请参阅下方 起始消费位点 以获取更多细节。 |
+| scan.startup.specific-offsets | 可选 | (none) | String | 在使用 'specific-offsets' 启动模式时为每个 partition 指定 offset,例如 'partition:0,offset:42;partition:1,offset:300'。 |
+| scan.startup.timestamp-millis | 可选 | (none) | Long | 在使用 'timestamp' 启动模式时指定启动的时间戳(单位毫秒)。 |
+| scan.topic-partition-discovery.interval | 可选 | (none) | Duration | Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。 |
+
+## 可用的元数据字段
+
+以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。 它支持读取格式 `canal-json-inlong` 的元数据。
+
+| 字段名称 | 数据类型 | 描述  | 
+|-----|------------|-------------|
+| value.table_name | STRING | 包含该行的表的名称 | 
+| value.database_name | STRING | 包含该行的数据库的名称  |
+| value.op_ts| TIMESTAMP(3) | 它指示在数据库中进行更改的时间。 如果记录是从表的快照而不是binlog中读取的,则该值始终为0 |
+| value.op_type| STRING | 操作类型, INSERT/UPDATE/DELETE |
+| value.batch_id| BIGINT | 不重要的, 一个简单的自增器 |
+| value.is_ddl| BOOLEAN | 不下发 DDL, 值是 false |
+| value.update_before| `ARRAY<MAP<STRING, STRING>>` | UPDATE 记录的 update-before 数据 |
+| value.mysql_type | MAP<STRING, STRING> | MySQL 字段类型 |
+| value.pk_names | `ARRAY<STRING>` | 主键 |
+| value.sql_type | MAP<STRING, INT> | SQL 字段类型 |
+| value.ts | TIMESTAMP_LTZ(3) | ts_ms 字段用于存储有关连接器处理/生成事件的本地时间的信息 |
+
+扩展的 CREATE TABLE 示例演示了使用这些元数据字段的语法:
+
+```sql
+CREATE TABLE `kafka_extract_node` (
+      `id` INT,
+      `name` STRING,
+      `database_name` string METADATA FROM 'value.database_name',
+      `table_name`    string METADATA FROM 'value.table_name',
+      `op_ts`         timestamp(3) METADATA FROM 'value.op_ts',
+      `op_type` string METADATA FROM 'value.op_type',
+      `batch_id` bigint METADATA FROM 'value.batch_id',
+      `is_ddl` boolean METADATA FROM 'value.is_ddl',
+      `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update_before',
+      `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql_type',
+      `pk_names` ARRAY<STRING> METADATA FROM 'value.pk_names',
+      `data` STRING METADATA FROM 'value.data',
+      `sql_type` MAP<STRING, INT> METADATA FROM 'value.sql_type',
+      `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ts',
+) WITH (
+      'connector' = 'kafka-inlong',
+      'topic' = 'user',
+      'properties.bootstrap.servers' = 'localhost:9092',
+      'properties.group.id' = 'testGroup',
+      'scan.startup.mode' = 'earliest-offset',
+      'format' = 'canal-json-inlong'
+)
+```
+
+## 数据类型映射
+
+Kafka 将消息键值以二进制进行存储,因此 Kafka 并不存在 schema 或数据类型。Kafka 消息使用格式配置进行序列化和反序列化,例如 csv,json,avro。 因此,数据类型映射取决于使用的格式。请参阅 [格式](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/overview/) 页面以获取更多细节。
\ No newline at end of file
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/overview.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/overview.md
index cc2871259..0bf0cfb7b 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/overview.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/overview.md
@@ -11,14 +11,14 @@ Extract 节点列表是一组基于 <a href="https://flink.apache.org/">Apache F
 
 | Extract 节点                          | 版本                                                                                                                                                                                                                                                                                                                                                                                                    | 驱动包                     |
 |-------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------|
-| [kafka](kafka.md)                   | [Kafka](https://kafka.apache.org/): 0.10+                                                                                                                                                                                                                                                                                                                                                             | None                    |
-| [pulsar](pulsar.md)                 | [Pulsar](https://pulsar.apache.org/): 2.8.x+                                                                                                                                                                                                                                                                                                                                                          | None                    |
-| [hdfs](hdfs.md)                     | [HDFS](https://hadoop.apache.org/): 2.x, 3.x                                                                                                                                                                                                                                                                                                                                                          | None                    |
-| [mongodb-cdc](mongodb-cdc.md)       | [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0                                                                                                                                                                                                                                                                                                                                                     | None                    |
-| [mysql-cdc](mysql-cdc.md)           | [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <br/>[RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <br/> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <br/> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <br/> [MariaDB](https://mariadb.org): 10.x <br/> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.21     |
-| [oracle-cdc](oracle-cdc.md)         | [Oracle](https://www.oracle.com/index.html): 11, 12, 19                                                                                                                                                                                                                                                                                                                                               | Oracle Driver: 19.3.0.0 |
-| [postgresql-cdc](postgresql-cdc.md) | [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12                                                                                                                                                                                                                                                                                                                                             | None                    |
-| [sqlserver-cdc](sqlserver-cdc.md)   | [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019                                                                                                                                                                                                                                                                                                                       | None                    |
+| [Kafka](kafka.md)                   | [Kafka](https://kafka.apache.org/): 0.10+                                                                                                                                                                                                                                                                                                                                                             | None                    |
+| [Pulsar](pulsar.md)                 | [Pulsar](https://pulsar.apache.org/): 2.8.x+                                                                                                                                                                                                                                                                                                                                                          | None                    |
+| [HDFS](hdfs.md)                     | [HDFS](https://hadoop.apache.org/): 2.x, 3.x                                                                                                                                                                                                                                                                                                                                                          | None                    |
+| [MongoDB-CDC](mongodb-cdc.md)       | [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0                                                                                                                                                                                                                                                                                                                                                     | None                    |
+| [MySQL-CDC](mysql-cdc.md)           | [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <br/>[RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <br/> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <br/> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <br/> [MariaDB](https://mariadb.org): 10.x <br/> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.21     |
+| [Oracle-CDC](oracle-cdc.md)         | [Oracle](https://www.oracle.com/index.html): 11, 12, 19                                                                                                                                                                                                                                                                                                                                               | Oracle Driver: 19.3.0.0 |
+| [PostgreSQL-CDC](postgresql-cdc.md) | [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12                                                                                                                                                                                                                                                                                                                                             | JDBC Driver: 42.2.12     |
+| [SQLServer-CDC](sqlserver-cdc.md)   | [SQLServer](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019                                                                                                                                                                                                                                                                                                                       | None                    |
 
 ## 支持的 Flink 版本列表
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/postgresql-cdc.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/postgresql-cdc.md
index 2466f033e..6d2b956ea 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/postgresql-cdc.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/postgresql-cdc.md
@@ -1,4 +1,175 @@
 ---
 title: PostgreSQL
 sidebar_position: 9
----
\ No newline at end of file
+---
+
+## PostgreSQL Extract 节点
+
+`PostgreSQL Extract` 节点允许从 PostgreSQL 数据库中读取快照数据和增量数据。 本文档描述了如何设置 PostgreSQL Extract 节点以对 PostgreSQL 数据库运行 SQL 查询。
+
+## 支持版本
+
+| Extract 节点                | 版本 | 驱动                  |
+|-----------------------------|---------|-------------------------|
+| [PostgreSQL-CDC](./postgresql-cdc.md) | [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12 | JDBC Driver: 42.2.12 |
+
+## 依赖
+
+为了设置 PostgreSQL Extract 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。
+
+### Maven 依赖
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-postgres-cdc</artifactId>
+    <!-- 选择适合你的 Application 的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+## 设置 PostgreSQL 服务
+
+更改数据捕获 (CDC) 允许您跟踪 PostgreSQL 数据库中的更改并将其传播到基于其预写日志 (WAL) 的下游消费者。
+您需要确保将上游数据库配置为支持逻辑复制。 在使用 PostgreSQL 连接器监控 PostgreSQL 服务器上提交的更改之前,
+决定您打算使用哪个逻辑解码插件。 如果您不打算使用本机 `pgoutput` 逻辑复制流支持,则必须安装逻辑解码
+插件到 PostgreSQL 服务器。
+
+### pgoutput
+
+pgoutput 是 PostgreSQL 10+ 中的标准逻辑解码输出插件。 它由 PostgreSQL 社区维护,并由 PostgreSQL 本身用于逻辑复制。
+此插件始终存在,因此无需安装其他库。
+
+1. 检查`wal_level`配置设置:
+
+```sql
+SHOW wal_level;
+```
+默认值为副本。 对于 CDC,您需要在数据库配置文件 (postgresql.conf) 中将其设置为`logical`。 请记住,更改 wal_level 需要重新启动 Postgres 实例,并且可能会影响数据库性能。
+
+2. 请在 postgresql.conf 文件中指定以下内容:
+
+```properties
+wal_level = logical 
+```
+
+### decoderbufs
+
+decoderbufs 基于 Protobuf 并由 Debezium 社区维护, [安装](https://github.com/debezium/postgres-decoderbufs) 它。
+
+1. 要在启动时加载插件,请将以下内容添加到 postgresql.conf 文件中:
+
+```properties
+shared_preload_libraries = 'decoderbufs'
+```
+2. 请在 postgresql.conf 文件中指定以下内容:
+
+```properties
+wal_level = logical 
+```
+
+## 如何创建 PostgreSQL Extract 节点
+
+### SQL API 用法
+
+```sql
+CREATE TABLE `postgresTable`(
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'postgres-cdc',
+  'hostname' = 'localhost',
+  'username' = 'postgres',
+  'password' = 'inlong',
+  'database-name' = 'postgres',
+  'schema-name' = 'public',
+  'port' = '5432',
+  'table-name' = 'user',
+  'decoding.plugin.name' = 'pgoutput',
+  'slot.name' = 'feaafacbaddadc'
+)
+```
+
+### InLong Dashboard 用法
+
+TODO: 将在未来支持此功能。
+
+### Usage for InLong Manager Client
+
+TODO: 将在未来支持此功能。
+
+## PostgreSQL Extract 节点参数
+
+| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
+|---------|----------|---------|------|------------|
+| connector | 必选 | (none) | String | 指定使用的连接器,这是设置 `postgres-cdc`.|
+| hostname | 必选 | (none) | String | PostgreSQL 数据库的 IP 地址或者主机名 |
+| username | 必选 | (none) | String | 连接到 PostgreSQL 数据库服务器时要使用的 PostgreSQL 数据库的名称。 |
+| password | 必选 | (none) | String |  |
+| database-name | 必选 | (none) | String | 连接到 PostgreSQL 数据库服务器时使用的密码。 |
+| schema-name | 必选 | (none) | String | 要监控的 PostgreSQL 数据库的模式名称。|
+| table-name | 必选 | (none) | String | 要监控的 PostgreSQL 数据库的表名。 |
+| port | 可选 | 5432 | Integer | PostgreSQL 数据库服务器的整数端口号。 |
+| decoding.plugin.name | 可选 | decoderbufs | String | 服务器上安装的 Postgres 逻辑解码插件的名称。 支持的值是 decoderbufs、wal2json、wal2json_rds、wal2json_streaming、wal2json_rds_streaming 和 pgoutput。 |
+| slot.name | 可选 | flink | String | PostgreSQL 逻辑解码槽的名称,它是为从特定数据库/模式的特定插件流式传输更改而创建的。 服务器使用此插槽将事件流式传输到您正在配置的连接器。 插槽名称必须符合 PostgreSQL 复制插槽命名规则,其中规定:“每个复制插槽都有一个名称,可以包含小写字母、数字和下划线字符。” |
+| debezium.* | 可选 | (none) | String | 将 Debezium 的属性传递给用于从 Postgres 服务器捕获数据更改的 Debezium Embedded Engine。 例如:“debezium.snapshot.mode”=“never”。 查看更多关于 [Debezium 的 Postgres 连接器属性](https://debezium.io/documentation/reference/1.5/connectors/postgresql.html#postgresql-connector-properties)。 |
+
+**Note**: `slot.name` 建议为不同的表设置以避免潜在的 PSQLException: ERROR: replication slot "flink" is active for PID 974 error。  
+**Note**: PSQLException: ERROR: all replication slots are in use Hint: Free one or increase max_replication_slots. 我们可以通过以下语句删除槽。  
+```sql
+SELECT*FROM pg_replication_slots;
+-- 获取插槽名称为 flink。 删除它
+SELECT pg_drop_replication_slot('flink');
+```
+
+## 可用的元数据
+
+以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。
+
+| 字段名称 | 数据类型 | 描述  | 
+|-----|------------|-------------|
+| table_name | STRING NOT NULL | 包含该行的表的名称 |
+| schema_name | STRING NOT NULL | 包含该行的模式的名称 |
+| database_name | STRING NOT NULL | 包含该行的数据库的名称 |
+| op_ts | TIMESTAMP_LTZ(3) NOT NULL | 它指示在数据库中进行更改的时间。如果记录是从表的快照而不是更改流中读取的,则该值始终为 0。 |
+
+扩展的 CREATE TABLE 示例演示了使用这些元数据字段的语法:
+
+```sql
+CREATE TABLE postgresTable (
+    db_name STRING METADATA FROM 'database_name' VIRTUAL,
+    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
+    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+    `name` STRING,
+    `age` INT
+) WITH (
+     'connector' = 'postgres-cdc',
+     'hostname' = 'localhost',
+     'username' = 'postgres',
+     'password' = 'inlong',
+     'database-name' = 'postgres',
+     'schema-name' = 'public',
+     'port' = '5432',
+     'table-name' = 'user',
+     'decoding.plugin.name' = 'pgoutput',
+     'slot.name' = 'feaafacbaddadc'
+);
+```
+
+## 数据类型映射
+
+| PostgreSQL 类型 | Flink SQL 类型 |
+|-----------------|----------------|
+|                 | TINYINT        |
+| SMALLINT <br/> INT2 <br/> SMALLSERIAL <br/> SERIAL2 | SMALLINT |
+| INTEGER <br/> SERIAL | INT |
+| BIGINT <br/> BIGSERIAL | BIGINT |
+| | DECIMAL(20, 0) |
+| REAL <br/> FLOAT4 | FLOAT |
+| FLOAT8 <br/> DOUBLE PRECISION| DOUBLE |
+| NUMERIC(p, s) <br/> DECIMAL(p, s) | DECIMAL(p, s) |
+| BOOLEAN | BOOLEAN |
+| DATE | DATE |
+| TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
+| TIMESTAMP [(p)] [WITHOUT TIMEZONE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+| CHAR(n) <br/> CHARACTER(n) <br/> VARCHAR(n) <br/> CHARACTER VARYING(n) <br/> TEXT | STRING |
+| BYTEA | BYTES |
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/clickhouse.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/clickhouse.md
index 24cd8c312..200ced7d9 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/clickhouse.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/clickhouse.md
@@ -3,10 +3,122 @@ title: ClickHouse
 sidebar_position: 5
 ---
 
-## 总览
-[ClickHouse](https://clickhouse.com/docs/en/intro/) 是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS)。
+## ClickHouse Load 节点
 
-## 配置
-创建数据流时,数据流向选择 `ClickHouse`,并点击 ”添加“ 进行配置。
+`ClickHouse Load` 节点支持将数据写入 ClickHouse 数据库。 本文档介绍如何设置 ClickHouse Load 节点以对 ClickHouse 数据库运行 SQL 查询。
 
-![ClickHouse Configuration](img/clickhouse.png)
\ No newline at end of file
+## 支持的版本
+
+| Load 节点                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [ClickHouse](./clickhouse.md) | ClickHouse  | ru.yandex.clickhouse | clickhouse-jdbc | [下载](https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc) |
+
+## 依赖
+
+为了设置 ClickHouse Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。
+
+### Maven 依赖
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- 选择适合你的 Application 的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## 如何创建 ClickHouse Load 节点
+
+### SQL API 用法
+
+```sql
+
+-- MySQL Extract 节点
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- ClickHouse Load 节点
+CREATE TABLE `clickhouse_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc-inlong',
+  'dialect-impl' = 'org.apache.inlong.sort.jdbc.dialect.ClickHouseDialect',
+  'url' = 'jdbc:clickhouse://localhost:8123/demo',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- 写数据到 ClickHouse
+INSERT INTO clickhouse_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### InLong Dashboard 用法
+
+创建数据流时,数据流方向选择`ClickHouse`,点击“添加”进行配置。
+
+![ClickHouse Configuration](img/clickhouse.png)
+
+### InLong Manager Client 用法
+
+TODO: 将在未来支持此功能。
+
+## ClickHouse Load 节点参数
+
+| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
+|---------|----------|---------|------|------------|
+| connector | 必选 | (none) | String | 指定使用什么类型的连接器,这里应该是 'jdbc-inlong'。 |
+| url | 必选 | (none) | String | JDBC 数据库 url。 |
+| dialect-impl | 必选 | (none) |  String | `org.apache.inlong.sort.jdbc.dialect.ClickHouseDialect` |
+| table-name | 必选 | (none) | String | 连接到 JDBC 表的名称。 |
+| driver | 可选 | (none) | String | 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。 |
+| username | 可选 | (none) | String | JDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。 |
+| password | 可选 | (none) | String | JDBC 密码。 |
+| connection.max-retry-timeout | 可选 | 60s | Duration | 最大重试超时时间,以秒为单位且不应该小于 1 秒。 |
+| sink.buffer-flush.max-rows | 可选 | 100 | Integer | flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 |
+| sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
+| sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
+| sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+
+## 数据类型映射
+
+| ClickHouse type | Flink SQL type |
+|-----------------|----------------|
+| String         | CHAR           |
+| String <br/> IP <br/> UUID | VARCHAR |
+| String <br/> EnumL | STRING |
+| UInt8 | BOOLEAN |
+| FixedString | BYTES |
+| Decimal <br/> Int128 <br/> Int256 <br/> UInt64 <br/> UInt128 <br/> UInt256 | DECIMAL |
+| Int8 | TINYINT |
+| Int16 <br/> UInt8 | SMALLINT |
+| Int32 <br/> UInt16 <br/> Interval | INTEGER |
+| Int64 <br/> UInt32 | BIGINT |
+| Float32 | FLOAT |
+| Date | DATE |
+| DateTime | TIME |
+| DateTime | TIMESTAMP |
+| DateTime | TIMESTAMP_LTZ |
+| Int32 | INTERVAL_YEAR_MONTH |
+| Int64 | INTERVAL_DAY_TIME |
+| Array | ARRAY |
+| Map | MAP |
+| Not supported | ROW |
+| Not supported | MULTISET |
+| Not supported | RAW |
\ No newline at end of file
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/greenplum.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/greenplum.md
index 79a2493d8..9e5df92b2 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/greenplum.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/greenplum.md
@@ -1,4 +1,114 @@
 ---
 title: Greenplum
 sidebar_position: 9
----
\ No newline at end of file
+---
+
+## Greenplum Load 节点
+
+`Greenplum Load` 节点支持将数据写入 Greenplum 数据库。 本文档介绍如何设置 Greenplum Load 节点以对 Greenplum 数据库运行 SQL 查询。
+
+## 支持的版本
+
+| Load 节点                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [Greenplum](./greenplum.md) | PostgreSQL  | org.postgresql | postgresql | [下载](https://jdbc.postgresql.org/download.html) |
+
+## 依赖
+
+为了设置 Greenplum Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。
+
+### Maven 依赖
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- 选择适合你的 Application 的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## 如何创建 PostgreSQL Load 节点
+
+### SQL API 用法
+
+```sql
+
+-- MySQL Extract 节点
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- Greenplum Load 节点
+CREATE TABLE `greenplum_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc-inlong',
+  'url' = 'jdbc:postgresql://localhost:5432/write',
+  'dialect-impl' = 'org.apache.inlong.sort.jdbc.dialect.GreenplumDialect',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'public.user'
+)
+
+-- 写数据到 Greenplum
+INSERT INTO greenplum_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### InLong Dashboard 用法
+
+TODO: 将在未来支持此功能。
+
+### InLong Manager Client 用法
+
+TODO: 将在未来支持此功能。
+
+## Greenplum Load 节点参数
+
+| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
+|---------|----------|---------|------|------------|
+| connector | 必选 | (none) | String | 指定使用什么类型的连接器,这里应该是 'jdbc-inlong'。 |
+| url | 必选 | (none) | String | JDBC 数据库 url。 |
+| dialect-impl | 必选 | (none) |  String | `org.apache.inlong.sort.jdbc.dialect.GreenplumDialect` |
+| table-name | 必选 | (none) | String | 连接到 JDBC 表的名称。 |
+| driver | 可选 | (none) | String | 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。 |
+| username | 可选 | (none) | String | JDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。 |
+| password | 可选 | (none) | String | JDBC 密码。 |
+| connection.max-retry-timeout | 可选 | 60s | Duration | 最大重试超时时间,以秒为单位且不应该小于 1 秒。 |
+| sink.buffer-flush.max-rows | 可选 | 100 | Integer | flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 |
+| sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
+| sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
+| sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+
+## 数据类型映射
+
+| Greenplum 类型 | Flink SQL 类型 |
+|-----------------|----------------|
+|                 | TINYINT        |
+| SMALLINT <br/> INT2 <br/> SMALLSERIAL <br/> SERIAL2 | SMALLINT |
+| INTEGER <br/> SERIAL | INT |
+| BIGINT <br/> BIGSERIAL | BIGINT |
+| | DECIMAL(20, 0) |
+| REAL <br/> FLOAT4 | FLOAT |
+| FLOAT8 <br/> DOUBLE PRECISION| DOUBLE |
+| NUMERIC(p, s) <br/> DECIMAL(p, s) | DECIMAL(p, s) |
+| BOOLEAN | BOOLEAN |
+| DATE | DATE |
+| TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
+| TIMESTAMP [(p)] [WITHOUT TIMEZONE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+| CHAR(n) <br/> CHARACTER(n) <br/> VARCHAR(n) <br/> CHARACTER VARYING(n) <br/> TEXT | STRING |
+| BYTEA | BYTES |
\ No newline at end of file
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hbase.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hbase.md
index c5d5d973d..bb871148d 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hbase.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hbase.md
@@ -1,4 +1,123 @@
 ---
 title: HBase
 sidebar_position: 10
----
\ No newline at end of file
+---
+
+## HBase Load 节点
+
+`HBase Load` 节点支持写数据都 HBase 数据库.
+
+## 支持的版本
+
+| Load 节点                | HBase 版本 |                                                                                                                                                                                                                                                                                                                                                                                           
+|-------------------------|------------|
+| [HBase](./hbase.md)     | 2.2.x      |  
+
+## 依赖
+
+为了设置 HBase Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。
+
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-hbase</artifactId>
+    <!-- 选择适合你的 Application 的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+## 如何创建 HBase Load 节点
+
+### SQL API 用法
+
+所有 HBase 表的列簇必须定义为 ROW 类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。用户只需在表结构中
+声明查询中使用的的列簇和列限定符。除了 ROW 类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为 HBase 的 rowkey,一张表中只能声明一个 
+rowkey。rowkey 字段的名字可以是任意的,如果是保留关键字,需要用反引号。  
+
+下面这个例子展示了如何用 `Flink SQL` 创建一个 HBase Load 节点:
+
+```sql
+-- 在 Flink SQL 中创建 HBase 表 'hbase_load_node'
+CREATE TABLE hbase_load_node (
+    rowkey STRING,
+    family1 ROW<q1 INT>,
+    family2 ROW<q2 STRING, q3 BIGINT>,
+    family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
+    PRIMARY KEY (rowkey) NOT ENFORCED
+) WITH (
+      'connector' = 'hbase-2.2',
+      'table-name' = 'mytable',
+      'zookeeper.quorum' = 'localhost:2181'
+);
+
+-- 使用 ROW(...) 构造函数构造列族和写数据到 HBase 表。
+-- 假设表"T"的 schema [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
+INSERT INTO hTable
+SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
+
+-- 从 HBase 表中扫描数据
+SELECT rowkey, family1, family3.q4, family3.q6 FROM hTable;
+
+-- 将 HBase 表临时连接为维度表
+SELECT * FROM myTopic
+LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime
+ON myTopic.key = hTable.rowkey;
+```
+
+### InLong Dashboard 用法
+
+TODO: 将在未来支持此功能。
+
+### InLong Manager Client 用法
+
+TODO: 将在未来支持此功能。
+
+## HBase Load 节点参数
+
+| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
+|-----|---------|-------|---------|-----|
+| connector | 必选 | (none) | String | 指定使用的连接器: hbase-2.2: 连接 HBase 2.2.x 集群 |
+| table-name | 必选 | (none) | String | 连接的 HBase 表名。 |
+| zookeeper.quorum | 必选 | (none) | String | HBase Zookeeper quorum 信息。 |
+| zookeeper.znode.parent | 可选 | /hbase | String | HBase 集群的 Zookeeper 根目录。|
+| null-string-literal | 可选 | null | String | 当字符串值为 null 时的存储形式,默认存成 "null" 字符串。HBase 的 source 和 sink 的编解码将所有数据类型(除字符串外)将 null 值以空字节来存储。 |
+| sink.buffer-flush.max-size | 可选 | 2mb | MemorySize | 写入的参数选项。每次写入请求缓存行的最大大小。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。 |
+| sink.buffer-flush.max-rows | 可选 | 1000 | Integer | 写入的参数选项。 每次写入请求缓存的最大行数。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。 |
+| sink.buffer-flush.interval | 可选 | 1s | Duration | 写入的参数选项。刷写缓存行的间隔。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。注意:"sink.buffer-flush.max-size" 和 "sink.buffer-flush.max-rows" 同时设置为 "0",刷写选项整个异步处理缓存行为。 |
+| sink.parallelism | 可选 | (none) | Integer | 为 HBase sink operator 定义并行度。默认情况下,并行度由框架决定,和链在一起的上游 operator 一样。 |
+| lookup.async | 可选 | false | Boolean | 是否启用异步查找。如果为真,查找将是异步的。注意:异步方式只支持 hbase-2.2 连接器 |
+| lookup.cache.max-rows | 可选 | (none) | Integer | 查找缓存的最大行数,超过这个值,最旧的行将过期。注意:"lookup.cache.max-rows" 和 "lookup.cache.ttl" 必须同时被设置。默认情况下,查找缓存是禁用的。 |
+| lookup.cache.ttl | 可选 | (none) | Duration | 查找缓存中每一行的最大生存时间,在这段时间内,最老的行将过期。注意:"lookup.cache.max-rows" 和 "lookup.cache.ttl" 必须同时被设置。默认情况下,查找缓存是禁用的。 |
+| lookup.max-retries | 可选 | 3 | Integer | 查找数据库失败时的最大重试次数。 |
+| properties.* | 可选 | (none) | String | 可以设置任意 HBase 的配置项。后缀名必须匹配在 [HBase 配置文档](https://hbase.apache.org/2.3/book.html#hbase_default_configurations) 中定义的配置键。Flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 HBase 客户端。 例如您可以设置 'properties.hbase.security.authentication' = 'kerberos' 等kerberos认证参数。 |
+
+## 数据类型映射
+
+HBase 以字节数组存储所有数据。在读和写过程中要序列化和反序列化数据。
+
+Flink 的 HBase 连接器利用 HBase(Hadoop) 的工具类 org.apache.hadoop.hbase.util.Bytes 进行字节数组和 Flink 数据类型转换。
+
+Flink 的 HBase 连接器将所有数据类型(除字符串外)null 值编码成空字节。对于字符串类型,null 值的字面值由null-string-literal选项值决定。
+
+数据类型映射表如下:
+
+| Flink SQL 类型 | HBase 转换 |
+|-----------------|-----------------|
+| CHAR <br/> VARCHAR <br/> STRING | byte[] toBytes(String s) <br/> String toString(byte[] b) |
+| BOOLEAN | byte[] toBytes(boolean b) <br/> boolean toBoolean(byte[] b) |
+| BINARY <br/> VARBINARY | Returns byte[] as is. |
+| DECIMAL | byte[] toBytes(BigDecimal v) <br/> BigDecimal toBigDecimal(byte[] b) |
+| TINYINT | new byte[] { val } <br/> bytes[0] // returns first and only byte from bytes |
+| SMALLINT | byte[] toBytes(short val) <br/> short toShort(byte[] bytes) |
+| INT | byte[] toBytes(int val) <br/> int toInt(byte[] bytes) |
+| BIGINT | byte[] toBytes(long val) <br/> long toLong(byte[] bytes) |
+| FLOAT | byte[] toBytes(float val) <br/> float toFloat(byte[] bytes) |
+| DOUBLE | byte[] toBytes(double val) <br/> double toDouble(byte[] bytes) |
+| DATE | Stores the number of days since epoch as int value. |
+| TIME | Stores the number of milliseconds of the day as int value. |
+| TIMESTAMP | Stores the milliseconds since epoch as long value. |
+| ARRAY | Not supported |
+| MAP <br/> MULTISET | Not supported |
+| ROW | Not supported |
\ No newline at end of file
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
index 56787dfb6..564cb1c35 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
@@ -3,7 +3,99 @@ title: Kafka
 sidebar_position: 4
 ---
 
-## Configuration
-创建数据流时,数据流向选择 `Kafka`,并点击 ”添加“ 进行配置。
+## Kafka Load Node
 
-![Kafka Configuration](img/kafka.png)
\ No newline at end of file
+`Kafka Load` 节点支持写数据到 Kafka topics。 它支持以普通的方式写入数据和 Upsert 的方式写入数据。 upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。
+
+## 支持的版本
+
+| Load 节点                | Kafka 版本 |                                                                                                                                                                                                                                                                                                                                                                                           
+|--------------------------|---------------|
+| [Kafka](./kafka.md)      | 0.10+         |  
+
+## 依赖
+
+为了设置 Kafka Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。
+
+### Maven 依赖
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-kafka</artifactId>
+    <!-- 选择适合你的 Application 的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## 如何创建 Kafka Load 节点
+
+### SQL API 用法
+
+下面这个例子展示了如何用 `Flink SQL` 创建一个 Kafka Load 节点:
+* 连接器是 `kafka-inlong`
+```sql
+-- 使用 Flink SQL 创建 Kafka 表 'kafka_load_node'
+Flink SQL> CREATE TABLE kafka_load_node (
+           `id` INT,
+           `name` STRINTG
+           ) WITH (
+           'connector' = 'kafka-inlong',
+           'topic' = 'user',
+           'properties.bootstrap.servers' = 'localhost:9092',
+           'properties.group.id' = 'testGroup',
+           'format' = 'csv'
+           )
+```
+* 连接器是 `upsert-kafka`
+```sql
+-- 使用 Flink SQL 创建 Kafka 表 'kafka_load_node'
+Flink SQL> CREATE TABLE kafka_load_node (
+          `id` INT,
+          `name` STRINTG,
+           PRIMARY KEY (`id`) NOT ENFORCED
+          ) WITH (
+          'connector' = 'upsert-kafka',
+          'topic' = 'user',
+          'properties.bootstrap.servers' = 'localhost:9092',
+          'key.format' = 'csv',
+          'value.format' = 'csv'
+          )   
+```
+### InLong Dashboard 用法
+
+创建数据流时,数据流方向选择`Kafka`,点击“添加”进行配置。
+
+![Kafka Configuration](img/kafka.png)
+
+### InLong Manager Client 用法
+
+TODO: 将在未来支持此功能。
+
+## Kafka Load 节点参数
+
+| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
+|---------|----------|---------|------|------------|
+| connector | 必选 | (none) | String | 指定要使用的连接器  1. Upsert Kafka 连接器使用: `upsert-kafka`  2. Kafka连接器使用: `kafka-inlong` |
+| topic | 必选 | (none) | String | 当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 `topic-1;topic-2`。注意,对 source 表而言,`topic` 和 `topic-pattern` 两个选项只能使用其中一个。 |
+| properties.bootstrap.servers | 必选 | (none) | String | 逗号分隔的 Kafka broker 列表。 |
+| properties.* | 可选 | (none) | String | 可以设置和传递任意 Kafka 的配置项。后缀名必须匹配在 [Kafka 配置文档](https://kafka.apache.org/documentation/#configuration) 中定义的配置键。Flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用 topic 的自动创建。但是某些配置项不支持进行配置,因为 Flink 会覆盖这些配置,例如 'key.deserializer' 和 'value.deserializer'。 |
+| format | 对于 Kafka 必选 | (none) | String | 用来序列化或反序列化 Kafka 消息的格式。 请参阅 [格式](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/) 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 'value.format' 二者必需其一。 |
+| key.format | 可选 | (none) | String | 用来序列化和反序列化 Kafka 消息键(Key)的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:如果定义了键格式,则配置项 'key.fields' 也是必需的。 否则 Kafka 记录将使用空值作为键。 |
+| key.fields | 可选 | [] | `List<String>` | 表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为 'field1;field2'。 |
+| key.fields-prefix | 可选 | (none) | String | 为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 'key.fields' 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 'value.fields-include' 配置为 'EXCEPT_KEY'。 |
+| value.format | 必选 for upsert Kafka | (none) | String | 用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 'csv'、'json'、'avro'。请参考[格式](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/overview/) 页面以获取更多详细信息和格式参数。 |
+| value.fields-include | 可选 | ALL | Enum Possible values: [ALL, EXCEPT_KEY]| 控制哪些字段应该出现在 value 中。可取值:<br/> ALL:消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。<br/> EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。 |
+| sink.partitioner | 可选 | 'default' | String | Flink partition 到 Kafka partition 的分区映射关系,可选值有:<br/>default:使用 Kafka 默认的分区器对消息进行分区。<br/>fixed:每个 Flink partition 最终对应最多一个 Kafka partition。<br/>round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。<br/>自定义 FlinkKafkaPartitioner 的子类:例如 'org.mycompany.MyPartitioner'。请参阅 [Sink 分区](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/kafka/#sink-%E5%88%86%E5%8C%BA) 以获取更多细节。 |
+| sink.semantic | 可选 | at-least-once | String | 定义 Kafka sink 的语义。有效值为 'at-least-once','exactly-once' 和 'none'。请参阅 [一致性保证](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/kafka/#%E4%B8%80%E8%87%B4%E6%80%A7%E4%BF%9D%E8%AF%81) 以获取更多细节。 |
+| sink.parallelism | 可选 | (none) | Integer | 定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。 |
+
+## 可用的元数据字段
+
+支持为格式 `canal-json-inlong`写元数据。
+
+参考 [Kafka Extract Node](../extract_node/kafka.md) 关于元数据的列表。
+
+## 数据类型映射
+
+Kafka 将消息键值以二进制进行存储,因此 Kafka 并不存在 schema 或数据类型。Kafka 消息使用格式配置进行序列化和反序列化,例如 csv,json,avro。 因此,数据类型映射取决于使用的格式。请参阅 [格式](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/overview/) 页面以获取更多细节。
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/mysql.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/mysql.md
index 813cfcb29..1c292daaf 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/mysql.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/mysql.md
@@ -1,4 +1,115 @@
 ---
 title: MySQL
 sidebar_position: 12
----
\ No newline at end of file
+---
+
+## MySQL Load 节点
+
+`MySQL Load` 节点支持将数据写入 MySQL 数据库。 本文档介绍如何设置 MySQL Load 节点以对 MySQL 数据库运行 SQL 查询。
+
+## 支持的版本
+
+| Load 节点                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [MySQL](./mysql.md)      | MySQL  | mysql    | mysql-connector-java | [下载](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+
+## 依赖
+
+为了设置 MySQL Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。
+
+### Maven 依赖
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- 选择适合你的 Application 的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+MySQL license 和 Inlong license 是冲突的。 所以我们移除了pom中的 MySQL 驱动依赖。 如果我们想使用这个连接器,我们可以修改pom文件。
+
+
+## 如何创建 MySQL Load 节点
+
+### SQL API 用法
+
+```sql
+
+-- MySQL Extract 节点
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- MySQL Load 节点
+CREATE TABLE `mysql_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/write',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- 写数据到 MySQL
+INSERT INTO mysql_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### InLong Dashboard 用法
+
+TODO: 将在未来支持此功能。
+
+### InLong Manager Client 用法
+
+TODO: 将在未来支持此功能。
+
+## MySQL Load 节点参数
+
+| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
+|---------|----------|---------|------|------------|
+| connector | 必选 | (none) | String | 指定使用什么类型的连接器,这里应该是 'jdbc-inlong'。 |
+| url | 必选 | (none) | String | JDBC 数据库 url。 |
+| table-name | 必选 | (none) | String | 连接到 JDBC 表的名称。 |
+| driver | 可选 | (none) | String | 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。 |
+| username | 可选 | (none) | String | JDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。 |
+| password | 可选 | (none) | String | JDBC 密码。 |
+| connection.max-retry-timeout | 可选 | 60s | Duration | 最大重试超时时间,以秒为单位且不应该小于 1 秒。 |
+| sink.buffer-flush.max-rows | 可选 | 100 | Integer | flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 |
+| sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
+| sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
+| sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+
+## 数据类型映射
+
+| MySQL 类型 | Flink SQL 类型 |
+|------------|----------------|
+| TINYINT | TINYINT |
+| SMALLINT <br/> TINYINT UNSIGNED| SMALLINT |
+| INT <br/> MEDIUMINT <br/> SMALLINT UNSIGNED | INT |
+| BIGINT <br/> INT UNSIGNED | BIGINT |
+| BIGINT UNSIGNED | DECIMAL(20, 0) |
+| FLOAT | FLOAT |
+| DOUBLE <br/> DOUBLE PRECISION | DOUBLE |
+| NUMERIC(p, s) <br/> DECIMAL(p, s) | DECIMAL(p, s) |
+| BOOLEAN <br/> TINYINT(1) | BOOLEAN |
+| DATE | DATE |
+| TIME [(p)] | TIME [(p)] [WITHOUT TIMEZONE] |
+| DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+| CHAR(n) <br/> VARCHAR(n) <br/> TEXT | STRING |
+| BINARY <br/> VARBINARY <br/> BLOB | BYTES |
+|  |  ARRAY |
\ No newline at end of file
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/oracle.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/oracle.md
index d08324620..fa372ac3d 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/oracle.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/oracle.md
@@ -1,4 +1,113 @@
 ---
 title: Oracle
 sidebar_position: 13
----
\ No newline at end of file
+---
+
+## Oracle Load 节点
+
+`Oracle Load` 节点支持将数据写入 Oracle 数据库。 本文档介绍如何设置 Oracle Load 节点以对 Oracle 数据库运行 SQL 查询。
+
+## 支持的版本
+
+| Load 节点                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [Oracle](./oracle.md) |  Oracle | com.oracle.database.jdbc | ojdbc8 | [下载](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
+
+## 依赖
+
+为了设置 Oracle Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。
+
+### Maven 依赖
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- 选择适合你的 Application 的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+Oracle license 和 Inlong license 是冲突的。 所以我们移除了pom中的 Oracle 驱动依赖。 如果我们想使用这个连接器,我们可以修改pom文件。
+
+## 如何创建 Oracle Load 节点
+
+### SQL API 用法
+
+```sql
+
+-- MySQL Extract 节点
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- Oracle Load 节点
+CREATE TABLE `oracle_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc-inlong',
+  'url' = 'jdbc:oracle:thin://host:port/database/',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'public.user'
+)
+
+-- 写数据到 Oracle
+INSERT INTO oracle_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### InLong Dashboard 用法
+
+TODO: 将在未来支持此功能。
+
+### InLong Manager Client 用法
+
+TODO: 将在未来支持此功能。
+
+## Oracle Load 节点参数
+
+| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
+|---------|----------|---------|------|------------|
+| connector | 必选 | (none) | String | 指定使用什么类型的连接器,这里应该是 'jdbc-inlong'。 |
+| url | 必选 | (none) | String | JDBC 数据库 url。 |
+| table-name | 必选 | (none) | String | 连接到 JDBC 表的名称。 |
+| driver | 可选 | (none) | String | 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。 |
+| username | 可选 | (none) | String | JDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。 |
+| password | 可选 | (none) | String | JDBC 密码。 |
+| connection.max-retry-timeout | 可选 | 60s | Duration | 最大重试超时时间,以秒为单位且不应该小于 1 秒。 |
+| sink.buffer-flush.max-rows | 可选 | 100 | Integer | flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 |
+| sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
+| sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
+| sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+
+## 数据类型映射
+
+| Oracle 类型 | Flink SQL 类型 |
+|-----------------|----------------|
+| BINARY_FLOAT    | FLOAT        |
+| BINARY_DOUBLE   | DOUBLE |
+| SMALLINT <br/> FLOAT(s) <br/> DOUBLE PRECISION <br/> REAL <br/> NUMBER(p, s) | DECIMAL(p, s) |
+| DATE | DATE |
+| | DECIMAL(20, 0) |
+| REAL <br/> FLOAT4 | FLOAT |
+| FLOAT8 <br/> DOUBLE PRECISION| DOUBLE |
+| NUMERIC(p, s) <br/> DECIMAL(p, s) | DECIMAL(p, s) |
+| BOOLEAN | BOOLEAN |
+| DATE | DATE |
+| TIMESTAMP [(p)] [WITHOUT TIMEZONE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+| CHAR(n) <br/> VARCHAR(n) <br/> CLOB(n) | STRING |
+| RAW(s) <br/> BLOB | BYTES |
+|  | ARRAY |
\ No newline at end of file
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/overview.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/overview.md
index fab832a9d..dc6e1ef6a 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/overview.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/overview.md
@@ -10,19 +10,19 @@ Load 节点列表是一组基于 <a href="https://flink.apache.org/">Apache Flin
 ## 支持的 Load 节点列表
 | Load 节点                                 | 版本                                                                                                                                                                                                                                                                                                                                                                                                     | 驱动包                     |
 |-----------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------|
-| [kafka](kafka.md)                       | [Kafka](https://kafka.apache.org/): 0.10+                                                                                                                                                                                                                                                                                                                                                              | None                    |
-| [hbase](hbase.md)                       | [Hbase](https://hbase.apache.org/): 2.2.x                                                                                                                                                                                                                                                                                                                                                              | None                    |
-| [postgresql](postgresql.md)             | [PostgreSQL](https://www.postgresql.org/): 9.6, 10, 11, 12                                                                                                                                                                                                                                                                                                                                             | None                    |
-| [oracle](oracle.md)                     | [Oracle](https://www.oracle.com/index.html): 11, 12, 19                                                                                                                                                                                                                                                                                                                                                | Oracle Driver: 19.3.0.0 |
-| [mysql](mysql.md)                       | [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <br/> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <br/> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <br/> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <br/> [MariaDB](https://mariadb.org): 10.x <br/> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.21     |
-| [tdsql-postgresql](tdsql-postgresql.md) | [TDSQL-PostgreSQL](https://cloud.tencent.com/document/product/1129): 10.17                                                                                                                                                                                                                                                                                                                             | Oracle Driver: 19.3.0.0 |
-| [greenplum](greenplum.md)               | [Greeplum](https://greenplum.org/): 4.x, 5.x, 6.x                                                                                                                                                                                                                                                                                                                                                      | None                    |
-| [elasticsearch](elasticsearch.md)       | [Elasticsearch](https://www.elastic.co/): 6.x, 7.x                                                                                                                                                                                                                                                                                                                                                     | None                    |
-| [clickhouse](clickhouse.md)             | [ClickHouse](https://clickhouse.com/): 20.7+                                                                                                                                                                                                                                                                                                                                                           | None                    |
-| [hive](hive.md)                         | [Hive](https://hive.apache.org/): 1.x, 2.x, 3.x                                                                                                                                                                                                                                                                                                                                                        | None                    |
-| [sqlserver](sqlserver.md)               | [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019                                                                                                                                                                                                                                                                                                                        | None                    |
-| [hdfs](hdfs.md)                         | [HDFS](https://hadoop.apache.org/): 2.x, 3.x                                                                                                                                                                                                                                                                                                                                                           | None                    |
-| [iceberg](iceberg.md)                   | [Iceberg](https://iceberg.apache.org/): 0.13.1+                                                                                                                                                                                                                                                                                                                                                        | None                    |
+| [Kafka](kafka.md)                       | [Kafka](https://kafka.apache.org/): 0.10+                                                                                                                                                                                                                                                                                                                                                              | None                    |
+| [HBase](hbase.md)                       | [Hbase](https://hbase.apache.org/): 2.2.x                                                                                                                                                                                                                                                                                                                                                              | None                    |
+| [PostgreSQL](postgresql.md)             | [PostgreSQL](https://www.postgresql.org/): 9.6, 10, 11, 12                                                                                                                                                                                                                                                                                                                                             | JDBC Driver: 42.2.12    |
+| [Oracle](oracle.md)                     | [Oracle](https://www.oracle.com/index.html): 11, 12, 19                                                                                                                                                                                                                                                                                                                                                | Oracle Driver: 19.3.0.0 |
+| [MySQL](mysql.md)                       | [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <br/> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <br/> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <br/> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <br/> [MariaDB](https://mariadb.org): 10.x <br/> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.21     |
+| [TDSQL-PostgreSQL](tdsql-postgresql.md) | [TDSQL-PostgreSQL](https://cloud.tencent.com/document/product/1129): 10.17                                                                                                                                                                                                                                                                                                                             | JDBC Driver: 42.2.12    |
+| [Greenplum](greenplum.md)               | [Greenplum](https://greenplum.org/): 4.x, 5.x, 6.x                                                                                                                                                                                                                                                                                                                                                     | JDBC Driver: 42.2.12    |
+| [Elasticsearch](elasticsearch.md)       | [Elasticsearch](https://www.elastic.co/): 6.x, 7.x                                                                                                                                                                                                                                                                                                                                                     | None                    |
+| [ClickHouse](clickhouse.md)             | [ClickHouse](https://clickhouse.com/): 20.7+                                                                                                                                                                                                                                                                                                                                                           | JDBC Driver: 0.3.1      |
+| [Hive](hive.md)                         | [Hive](https://hive.apache.org/): 1.x, 2.x, 3.x                                                                                                                                                                                                                                                                                                                                                        | None                    |
+| [SQLServer](sqlserver.md)               | [SQLServer](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019                                                                                                                                                                                                                                                                                                                        | JDBC Driver: 7.2.2.jre8 |
+| [HDFS](hdfs.md)                         | [HDFS](https://hadoop.apache.org/): 2.x, 3.x                                                                                                                                                                                                                                                                                                                                                           | None                    |
+| [Iceberg](iceberg.md)                   | [Iceberg](https://iceberg.apache.org/): 0.13.1+                                                                                                                                                                                                                                                                                                                                                        | None                    |
 
 
 ## 支持的 Flink 版本列表
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/postgresql.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/postgresql.md
index bcdacf590..bf6dfc320 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/postgresql.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/postgresql.md
@@ -1,4 +1,113 @@
 ---
 title: PostgreSQL
 sidebar_position: 14
----
\ No newline at end of file
+---
+
+## PostgreSQL Load 节点
+
+`PostgreSQL Load` 节点支持将数据写入 PostgreSQL 数据库。 本文档介绍如何设置 PostgreSQL Load 节点以对 PostgreSQL 数据库运行 SQL 查询。
+
+
+## 支持的版本
+
+| Load 节点                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [PostgreSQL](./postgresql.md) | PostgreSQL  | org.postgresql | postgresql | [下载](https://jdbc.postgresql.org/download.html) |
+
+## 依赖
+
+为了设置 PostgreSQL Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。
+
+### Maven 依赖
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- 选择适合你的 Application 的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## 如何创建 PostgreSQL Load 节点
+
+### SQL API 用法
+
+```sql
+
+-- MySQL Extract 节点
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- PostgreSQL Load 节点
+CREATE TABLE `postgresql_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc',
+  'url' = 'jdbc:postgresql://localhost:5432/write',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'public.user'
+)
+
+-- 写数据到 PostgreSQL
+INSERT INTO postgresql_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### InLong Dashboard 用法
+
+TODO: 将在未来支持此功能。
+
+### InLong Manager Client 用法
+
+TODO: 将在未来支持此功能。
+
+## PostgreSQL Load 节点参数
+
+| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
+|---------|----------|---------|------|------------|
+| connector | 必选 | (none) | String | 指定使用什么类型的连接器,这里应该是 'jdbc'。 |
+| url | 必选 | (none) | String | JDBC 数据库 url。 |
+| table-name | 必选 | (none) | String | 连接到 JDBC 表的名称。 |
+| driver | 可选 | (none) | String | 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。 |
+| username | 可选 | (none) | String | JDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。 |
+| password | 可选 | (none) | String | JDBC 密码。 |
+| connection.max-retry-timeout | 可选 | 60s | Duration | 最大重试超时时间,以秒为单位且不应该小于 1 秒。 |
+| sink.buffer-flush.max-rows | 可选 | 100 | Integer | flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 |
+| sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
+| sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
+| sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+
+## 数据类型映射
+
+| PostgreSQL 类型 | Flink SQL 类型 |
+|-----------------|----------------|
+|                 | TINYINT        |
+| SMALLINT <br/> INT2 <br/> SMALLSERIAL <br/> SERIAL2 | SMALLINT |
+| INTEGER <br/> SERIAL | INT |
+| BIGINT <br/> BIGSERIAL | BIGINT |
+| | DECIMAL(20, 0) |
+| REAL <br/> FLOAT4 | FLOAT |
+| FLOAT8 <br/> DOUBLE PRECISION| DOUBLE |
+| NUMERIC(p, s) <br/> DECIMAL(p, s) | DECIMAL(p, s) |
+| BOOLEAN | BOOLEAN |
+| DATE | DATE |
+| TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
+| TIMESTAMP [(p)] [WITHOUT TIMEZONE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+| CHAR(n) <br/> CHARACTER(n) <br/> VARCHAR(n) <br/> CHARACTER VARYING(n) <br/> TEXT | STRING |
+| BYTEA | BYTES |
\ No newline at end of file
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/sqlserver.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/sqlserver.md
index d44335d9b..459b6f124 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/sqlserver.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/sqlserver.md
@@ -1,4 +1,113 @@
 ---
-title: SqlServer
+title: SQLServer
 sidebar_position: 15
----
\ No newline at end of file
+---
+
+## SQLServer Load 节点
+
+`SQLServer Load` 节点支持将数据写入 MySQL 数据库。 本文档介绍如何设置 SQLServer Load 节点以对 SQLServer 数据库运行 SQL 查询。
+
+## 支持的版本
+
+| Load 节点                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [SQLServer](./sqlserver.md) | SQL Server  | com.microsoft.sqlserver | mssql-jdbc | [下载](https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc) |
+
+## 依赖
+
+为了设置 SQLServer Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。
+
+### Maven 依赖
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- 选择适合你的 Application 的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## 如何创建 SQLServer Load 节点
+
+### SQL API 用法
+
+```sql
+
+-- MySQL Extract 节点
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- SQLServer Load 节点
+CREATE TABLE `sqlserver_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc-inlong',
+  'url' = 'jdbc:sqlserver://localhost:1433;databaseName=column_type_test',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'dbo.work1'
+)
+
+-- 写数据到 SQLServer
+INSERT INTO sqlserver_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### InLong Dashboard 用法
+
+TODO: 将在未来支持此功能。
+
+### InLong Manager Client 用法
+
+TODO: 将在未来支持此功能。
+
+## SQLServer Load 节点参数
+
+| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
+|---------|----------|---------|------|------------|
+| connector | 必选 | (none) | String | 指定使用什么类型的连接器,这里应该是 'jdbc-inlong'。 |
+| url | 必选 | (none) | String | JDBC 数据库 url。 |
+| table-name | 必选 | (none) | String | 连接到 JDBC 表的名称。 |
+| driver | 可选 | (none) | String | 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。 |
+| username | 可选 | (none) | String | JDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。 |
+| password | 可选 | (none) | String | JDBC 密码。 |
+| connection.max-retry-timeout | 可选 | 60s | Duration | 最大重试超时时间,以秒为单位且不应该小于 1 秒。 |
+| sink.buffer-flush.max-rows | 可选 | 100 | Integer | flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 |
+| sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
+| sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
+| sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+
+## 数据映射
+
+| SQLServer 类型 | Flink SQL 类型 |
+|----------------|----------------|
+| char(n)       | CHAR(n)        |
+| varchar(n) <br/> nvarchar(n) <br/> nchar(n) | VARCHAR(n) |
+| text <br/> ntext <br/> xml | STRING |
+| BIGINT <br/> BIGSERIAL | BIGINT |
+| decimal(p, s) <br/> money <br/> smallmoney | DECIMAL(p, s) |
+| numeric  | NUMERIC |
+| float <br/> real | FLOAT |
+| bit | BOOLEAN |
+| int | INT |
+| tinyint | TINYINT |
+| smallint | SMALLINT |
+| bigint | BIGINT |
+| time(n) | TIME(n) |
+| datetime2 <br/> datetime <br/> smalldatetime | TIMESTAMP(n) |
+| datetimeoffset | TIMESTAMP_LTZ(3) |
\ No newline at end of file
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/tdsql-postgresql.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/tdsql-postgresql.md
index 35bef838b..fcea41bef 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/tdsql-postgresql.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/tdsql-postgresql.md
@@ -1,4 +1,112 @@
 ---
 title: TDSQL-PostgreSQL
 sidebar_position: 16
----
\ No newline at end of file
+---
+
+## TDSQL-PostgreSQL Load 节点
+
+`TDSQL-PostgreSQL Load` 节点支持将数据写入 TDSQL-PostgreSQL 数据库。 本文档介绍如何设置 TDSQL-PostgreSQL Load 节点以对 TDSQL-PostgreSQL 数据库运行 SQL 查询。
+
+## 支持的版本
+
+| Load 节点                | Driver | Group Id | Artifact Id | JAR |                                                                                                                                                                                                                                                                                                                                                                                       
+|--------------------------|--------|----------|-------------|-----|
+| [TDSQL-PostgreSQL](./tdsql-postgresql.md) | PostgreSQL  | org.postgresql | postgresql | [下载](https://jdbc.postgresql.org/download.html) |
+
+## 依赖
+
+为了设置 TDSQL-PostgreSQL Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。
+
+### Maven 依赖
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-jdbc</artifactId>
+    <!-- 选择适合你的 Application 的版本 -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## 如何创建 TDSQL-PostgreSQL Load 节点
+
+### SQL API 用法
+
+```sql
+
+-- MySQL Extract 节点
+CREATE TABLE `mysql_extract_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'mysql-cdc-inlong',
+  'url' = 'jdbc:mysql://localhost:3306/read',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'user'
+)
+
+-- TDSQL-PostgreSQL Load 节点
+CREATE TABLE `tdsql_postgresql_load_table`(
+  PRIMARY KEY (`id`) NOT ENFORCED,
+  `id` BIGINT,
+  `name` STRING,
+  `age` INT
+) WITH (
+  'connector' = 'jdbc-inlong',
+  'url' = 'jdbc:postgresql://localhost:5432/write',
+  'username' = 'inlong',
+  'password' = 'inlong',
+  'table-name' = 'public.user'
+)
+
+-- 写数据到 TDSQL-PostgreSQL
+INSERT INTO tdsql_postgresql_load_table 
+SELECT id, name , age FROM mysql_extract_table;  
+
+```
+
+### InLong Dashboard 用法
+
+TODO: 将在未来支持此功能。
+
+### InLong Manager Client 用法
+
+TODO: 将在未来支持此功能。
+
+## TDSQL-PostgreSQL Load 节点参数
+
+| 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
+|---------|----------|---------|------|------------|
+| connector | 必选 | (none) | String | 指定使用什么类型的连接器,这里应该是 'jdbc-inlong'。 |
+| url | 必选 | (none) | String | JDBC 数据库 url。 |
+| table-name | 必选 | (none) | String | 连接到 JDBC 表的名称。 |
+| driver | 可选 | (none) | String | 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。 |
+| username | 可选 | (none) | String | JDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。 |
+| password | 可选 | (none) | String | JDBC 密码。 |
+| connection.max-retry-timeout | 可选 | 60s | Duration | 最大重试超时时间,以秒为单位且不应该小于 1 秒。 |
+| sink.buffer-flush.max-rows | 可选 | 100 | Integer | flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 |
+| sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
+| sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
+| sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+
+## 数据类型映射
+
+| TDSQL-PostgreSQL 类型 | Flink SQL 类型 |
+|-----------------|----------------|
+|                 | TINYINT        |
+| SMALLINT <br/> INT2 <br/> SMALLSERIAL <br/> SERIAL2 | SMALLINT |
+| INTEGER <br/> SERIAL | INT |
+| BIGINT <br/> BIGSERIAL | BIGINT |
+| | DECIMAL(20, 0) |
+| REAL <br/> FLOAT4 | FLOAT |
+| FLOAT8 <br/> DOUBLE PRECISION| DOUBLE |
+| NUMERIC(p, s) <br/> DECIMAL(p, s) | DECIMAL(p, s) |
+| BOOLEAN | BOOLEAN |
+| DATE | DATE |
+| TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
+| TIMESTAMP [(p)] [WITHOUT TIMEZONE | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+| CHAR(n) <br/> CHARACTER(n) <br/> VARCHAR(n) <br/> CHARACTER VARYING(n) <br/> TEXT | STRING |
+| BYTEA | BYTES |
\ No newline at end of file