You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/13 09:22:32 UTC

[inlong-website] branch master updated: [INLONG-531][Sort] Add inlong metric decription (#532)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
     new f4bae06c12 [INLONG-531][Sort] Add inlong metric decription (#532)
f4bae06c12 is described below

commit f4bae06c12daa99347c522d244f0775a9a5389f3
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Tue Sep 13 17:22:27 2022 +0800

    [INLONG-531][Sort] Add inlong metric decription (#532)
---
 docs/data_node/extract_node/kafka.md               |   5 +-
 docs/data_node/extract_node/mongodb-cdc.md         |   8 +-
 docs/data_node/extract_node/mysql-cdc.md           |   7 ++
 docs/data_node/extract_node/oracle-cdc.md          |  15 ++-
 docs/data_node/extract_node/postgresql-cdc.md      |   7 +-
 docs/data_node/extract_node/pulsar.md              |   3 +-
 docs/data_node/extract_node/sqlserver-cdc.md       |  13 ++-
 docs/data_node/load_node/clickhouse.md             |   1 +
 docs/data_node/load_node/elasticsearch.md          |  13 ++-
 docs/data_node/load_node/greenplum.md              |   1 +
 docs/data_node/load_node/hbase.md                  |   5 +-
 docs/data_node/load_node/hdfs.md                   |   9 +-
 docs/data_node/load_node/hive.md                   |   7 ++
 docs/data_node/load_node/iceberg.md                |   1 +
 docs/data_node/load_node/kafka.md                  |   5 +-
 docs/data_node/load_node/mysql.md                  |   1 +
 docs/data_node/load_node/oracle.md                 |   1 +
 docs/data_node/load_node/postgresql.md             |   6 +-
 docs/data_node/load_node/sqlserver.md              |   1 +
 docs/data_node/load_node/tdsql-postgresql.md       |   1 +
 docs/modules/sort/metrics.md                       | 100 ++++++++++++++++++++
 .../current/data_node/extract_node/kafka.md        |   5 +-
 .../current/data_node/extract_node/mongodb-cdc.md  |   7 +-
 .../current/data_node/extract_node/mysql-cdc.md    |   7 ++
 .../current/data_node/extract_node/oracle-cdc.md   |  15 ++-
 .../data_node/extract_node/postgresql-cdc.md       |   7 +-
 .../current/data_node/extract_node/pulsar.md       |   5 +-
 .../data_node/extract_node/sqlserver-cdc.md        |  11 ++-
 .../current/data_node/load_node/clickhouse.md      |   1 +
 .../current/data_node/load_node/elasticsearch.md   |   7 ++
 .../current/data_node/load_node/greenplum.md       |   1 +
 .../current/data_node/load_node/hbase.md           |   5 +-
 .../current/data_node/load_node/hdfs.md            |   9 +-
 .../current/data_node/load_node/hive.md            |   7 ++
 .../current/data_node/load_node/iceberg.md         |   1 +
 .../current/data_node/load_node/kafka.md           |   5 +-
 .../current/data_node/load_node/mysql.md           |   1 +
 .../current/data_node/load_node/oracle.md          |   1 +
 .../current/data_node/load_node/postgresql.md      |   4 +-
 .../current/data_node/load_node/sqlserver.md       |   1 +
 .../data_node/load_node/tdsql-postgresql.md        |   1 +
 .../current/modules/sort/example.md                |   2 +-
 .../current/modules/sort/metrics.md                | 101 +++++++++++++++++++++
 43 files changed, 364 insertions(+), 50 deletions(-)

diff --git a/docs/data_node/extract_node/kafka.md b/docs/data_node/extract_node/kafka.md
index d843c1dec5..d365c9241f 100644
--- a/docs/data_node/extract_node/kafka.md
+++ b/docs/data_node/extract_node/kafka.md
@@ -70,7 +70,7 @@ Flink SQL> CREATE TABLE kafka_extract_node (
           `name` STRINTG,
            PRIMARY KEY (`id`) NOT ENFORCED
           ) WITH (
-          'connector' = 'upsert-kafka',
+          'connector' = 'upsert-kafka-inlong',
           'topic' = 'user',
           'properties.bootstrap.servers' = 'localhost:9092',
           'properties.group.id' = 'testGroup',
@@ -94,7 +94,7 @@ TODO: It will be supported in the future.
 
 | Option | Required | Default | Type | Description |
 |---------|----------|---------|------|------------|
-| connector | required | (none) | String | Specify which connector to use, valid values are:  1. for the Upsert Kafka use: `upsert-kafka`  2. for normal Kafka use: `kafka-inlong` |
+| connector | required | (none) | String | Specify which connector to use, valid values are:  1. for the Upsert Kafka use: `upsert-kafka-inlong`  2. for normal Kafka use: `kafka-inlong` |
 | topic | optional | (none) | String | Topic name(s) to read data from when the table is used as source. It also supports  topic list for source by separating topic by semicolon like `topic-1;topic-2`. Note, only one of `topic-pattern` and `topic` can be specified for sources. |
 | topic-pattern | optional | (none) | String | The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of `topic-pattern` and `topic` can be specified for sources. |
 | properties.bootstrap.servers | required | (none) | String | Comma separated list of Kafka brokers. |
@@ -110,6 +110,7 @@ TODO: It will be supported in the future.
 | scan.startup.specific-offsets | optional | (none) | String | Specify offsets for each partition in case of 'specific-offsets' startup mode, e.g. 'partition:0,offset:42;partition:1,offset:300'. |
 | scan.startup.timestamp-millis | optional | (none) | Long | Start from the specified epoch timestamp (milliseconds) used in case of 'timestamp' startup mode. |
 | scan.topic-partition-discovery.interval | optional | (none) | Duration | Interval for consumer to discover dynamically created Kafka topics and partitions periodically. |
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 | sink.ignore.changelog | optional | false | Boolean |  Importing all changelog mode data ingest into Kafka . |
 
 ## Available Metadata
diff --git a/docs/data_node/extract_node/mongodb-cdc.md b/docs/data_node/extract_node/mongodb-cdc.md
index 701a6e4c95..b3e9862f84 100644
--- a/docs/data_node/extract_node/mongodb-cdc.md
+++ b/docs/data_node/extract_node/mongodb-cdc.md
@@ -90,7 +90,7 @@ Flink SQL> CREATE TABLE mongodb_extract_node (
   suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
   PRIMARY KEY(_id) NOT ENFORCED
 ) WITH (
-  'connector' = 'mongodb-cdc',
+  'connector' = 'mongodb-cdc-inlong',
   'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
   'username' = 'flinkuser',
   'password' = 'flinkpw',
@@ -118,7 +118,7 @@ TODO: It will be supported in the future.
 
 | **Option**                | **Required** | **Default**      | **Type** | **Description**                                              |
 | ------------------------- | ------------ | ---------------- | -------- | ------------------------------------------------------------ |
-| connector                 | required     | (none)           | String   | Specify what connector to use, here should be `mongodb-cdc`. |
+| connector                 | required     | (none)           | String   | Specify what connector to use, here should be `mongodb-cdc-inlong`. |
 | hosts                     | required     | (none)           | String   | The comma-separated list of hostname and port pairs of the MongoDB servers. eg. `localhost:27017,localhost:27018` |
 | username                  | optional     | (none)           | String   | Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
 | password                  | optional     | (none)           | String   | Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication. |
@@ -134,7 +134,7 @@ TODO: It will be supported in the future.
 | poll.max.batch.size       | optional     | 1000             | Integer  | Maximum number of change stream documents to include in a single batch when polling for new data. |
 | poll.await.time.ms        | optional     | 1500             | Integer  | The amount of time to wait before checking for new results on the change stream. |
 | heartbeat.interval.ms     | optional     | 0                | Integer  | The length of time in milliseconds between sending heartbeat messages. Use 0 to disa |
-
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 ## Available Metadata
 
@@ -161,7 +161,7 @@ CREATE TABLE `mysql_extract_node` (
     suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
     PRIMARY KEY(_id) NOT ENFORCED
 ) WITH (
-      'connector' = 'mongodb-cdc', 
+      'connector' = 'mongodb-cdc-inlong', 
       'hostname' = 'YourHostname',
       'username' = 'YourUsername',
       'password' = 'YourPassword',
diff --git a/docs/data_node/extract_node/mysql-cdc.md b/docs/data_node/extract_node/mysql-cdc.md
index 0c20d01da5..0de9133506 100644
--- a/docs/data_node/extract_node/mysql-cdc.md
+++ b/docs/data_node/extract_node/mysql-cdc.md
@@ -304,6 +304,13 @@ TODO: It will be supported in the future.
           For example: <code>'debezium.snapshot.mode' = 'never'</code>.
           See more about the <a href="https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties">Debezium's MySQL Connector properties</a></td> 
     </tr>
+    <tr>
+      <td>inlong.metric</td>
+      <td>optional</td>
+      <td style={{wordWrap: 'break-word'}}>(none)</td>
+      <td>String</td>
+      <td>Inlong metric label, format of value is groupId&streamId&nodeId.</td> 
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/data_node/extract_node/oracle-cdc.md b/docs/data_node/extract_node/oracle-cdc.md
index 0bc27e8bd6..b98b89f767 100644
--- a/docs/data_node/extract_node/oracle-cdc.md
+++ b/docs/data_node/extract_node/oracle-cdc.md
@@ -201,7 +201,7 @@ See more about the [Setting up Oracle](https://debezium.io/documentation/referen
 
 The Oracle Extract Node can be defined as following:
 
-```sql 
+```sql
 -- Create an Oracle Extract Node 'user' in Flink SQL
 Flink SQL> CREATE TABLE oracle_extract_node (
      ID INT NOT NULL,
@@ -210,7 +210,7 @@ Flink SQL> CREATE TABLE oracle_extract_node (
      WEIGHT DECIMAL(10, 3),
      PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
-     'connector' = 'oracle-cdc',
+     'connector' = 'oracle-cdc-inlong',
      'hostname' = 'localhost',
      'port' = '1521',
      'username' = 'flinkuser',
@@ -252,7 +252,7 @@ TODO: It will be supported in the future.
       <td>required</td>
       <td style={{wordWrap: 'break-word'}}>(none)</td>
       <td>String</td>
-      <td>Specify what connector to use, here should be <code>'oracle-cdc'</code>.</td>
+      <td>Specify what connector to use, here should be <code>'oracle-cdc-inlong'</code>.</td>
     </tr>
     <tr>
       <td>hostname</td>
@@ -321,6 +321,13 @@ TODO: It will be supported in the future.
           For example: <code>'debezium.snapshot.mode' = 'never'</code>.
           See more about the <a href="https://debezium.io/documentation/reference/1.5/connectors/oracle.html#oracle-connector-properties">Debezium's Oracle Connector properties</a></td> 
      </tr>
+     <tr>
+      <td>inlong.metric</td>
+      <td>optional</td>
+      <td style={{wordWrap: 'break-word'}}>(none)</td>
+      <td>String</td>
+      <td>Inlong metric label, format of value is groupId&streamId&nodeId.</td> 
+    </tr>
     </tbody>
 </table>    
 </div>
@@ -387,7 +394,7 @@ CREATE TABLE products (
     WEIGHT DECIMAL(10, 3),
     PRIMARY KEY(id) NOT ENFORCED
 ) WITH (
-    'connector' = 'oracle-cdc',
+    'connector' = 'oracle-cdc-inlong',
     'hostname' = 'localhost',
     'port' = '1521',
     'username' = 'flinkuser',
diff --git a/docs/data_node/extract_node/postgresql-cdc.md b/docs/data_node/extract_node/postgresql-cdc.md
index c43b589803..f1e4de9fd2 100644
--- a/docs/data_node/extract_node/postgresql-cdc.md
+++ b/docs/data_node/extract_node/postgresql-cdc.md
@@ -78,7 +78,7 @@ CREATE TABLE `postgresTable`(
   `name` STRING,
   `age` INT
 ) WITH (
-  'connector' = 'postgres-cdc',
+  'connector' = 'postgres-cdc-inlong',
   'hostname' = 'localhost',
   'username' = 'postgres',
   'password' = 'inlong',
@@ -103,7 +103,7 @@ TODO: It will be supported in the future.
 
 | Option | Required | Default | Type | Description |
 |---------|----------|---------|------|------------|
-| connector | required | (none) | String | Specify what connector to use, here should be `postgres-cdc`.|
+| connector | required | (none) | String | Specify what connector to use, here should be `postgres-cdc-inlong`.|
 | hostname | required | (none) | String | IP address or hostname of the PostgreSQL database server. |
 | username | required | (none) | String | Name of the PostgreSQL database to use when connecting to the PostgreSQL database server. |
 | password | required | (none) | String | Password to use when connecting to the PostgreSQL database server. |
@@ -114,6 +114,7 @@ TODO: It will be supported in the future.
 | decoding.plugin.name | optional | decoderbufs | String | The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput. |
 | slot.name | optional | flink | String | The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring. Slot names must conform to PostgreSQL replication slot naming rules, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character." |
 | debezium.* | optional | (none) | String | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Postgres server. For example: 'debezium.snapshot.mode' = 'never'. See more about the [Debezium's Postgres Connector properties](https://debezium.io/documentation/reference/1.5/connectors/postgresql.html#postgresql-connector-properties). |
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 **Note**: `slot.name` is recommended to set for different tables to avoid the potential PSQLException: ERROR: replication slot "flink" is active for PID 974 error.  
 **Note**: PSQLException: ERROR: all replication slots are in use Hint: Free one or increase max_replication_slots. We can delete slot by the following statement.
@@ -144,7 +145,7 @@ CREATE TABLE postgresTable (
     `name` STRING,
     `age` INT
 ) WITH (
-     'connector' = 'postgres-cdc',
+     'connector' = 'postgres-cdc-inlong',
      'hostname' = 'localhost',
      'username' = 'postgres',
      'password' = 'inlong',
diff --git a/docs/data_node/extract_node/pulsar.md b/docs/data_node/extract_node/pulsar.md
index 7cdf245804..f0fd6919ea 100644
--- a/docs/data_node/extract_node/pulsar.md
+++ b/docs/data_node/extract_node/pulsar.md
@@ -56,7 +56,7 @@ CREATE TABLE pulsar (
   `key` STRING ,
   `physical_3` BOOLEAN
 ) WITH (
-  'connector' = 'pulsar',
+  'connector' = 'pulsar-inlong',
   'topic' = 'persistent://public/default/topic82547611',
   'key.format' = 'raw',
   'key.fields' = 'key',
@@ -107,6 +107,7 @@ TODO
 | key.fields-prefix             | optional | (none)        | String | Define a custom prefix for all fields in the key format to avoid name conflicts with fields in the value format. By default, the prefix is empty. If a custom prefix is defined, the Table schema and `key.fields` are used. |
 | format or value.format        | required | (none)        | String | Set the name with a prefix. When constructing data types in the key format, the prefix is removed and non-prefixed names are used within the key format. Pulsar message value serialization format, support JSON, Avro, etc. For more information, see the Flink format. |
 | value.fields-include          | optional | ALL           | Enum   | The Pulsar message value contains the field policy, optionally ALL, and EXCEPT_KEY. |
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 ## Available Metadata
 
diff --git a/docs/data_node/extract_node/sqlserver-cdc.md b/docs/data_node/extract_node/sqlserver-cdc.md
index 8f23a6fe5c..54edd11a95 100644
--- a/docs/data_node/extract_node/sqlserver-cdc.md
+++ b/docs/data_node/extract_node/sqlserver-cdc.md
@@ -90,7 +90,7 @@ Flink SQL> CREATE TABLE sqlserver_extract_node (
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
      ) WITH (
-     'connector' = 'sqlserver-cdc',
+     'connector' = 'sqlserver-cdc-inlong',
      'hostname' = 'YourHostname',
      'port' = 'port', --default:1433
      'username' = 'YourUsername',
@@ -127,7 +127,7 @@ TODO
       <td>required</td>
       <td style={{wordWrap: 'break-word'}}>(none)</td>
       <td>String</td>
-      <td>Specify what connector to use, here should be 'sqlserver-cdc'.</td>
+      <td>Specify what connector to use, here should be 'sqlserver-cdc-inlong'.</td>
     </tr>
     <tr>
       <td>hostname</td>
@@ -185,6 +185,13 @@ TODO
       <td>String</td>
       <td>The session time zone in database server, e.g. "Asia/Shanghai".</td>
     </tr>
+    <tr>
+      <td>inlong.metric</td>
+      <td>optional</td>
+      <td style={{wordWrap: 'break-word'}}>(none)</td>
+      <td>String</td>
+      <td>Inlong metric label, format of value is groupId&streamId&nodeId.</td> 
+    </tr>
     </tbody>
 </table>
 </div>
@@ -233,7 +240,7 @@ CREATE TABLE sqlserver_extract_node (
     operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
     id INT NOT NULL
 ) WITH (
-    'connector' = 'sqlserver-cdc',
+    'connector' = 'sqlserver-cdc-inlong',
     'hostname' = 'localhost',
     'port' = '1433',
     'username' = 'sa',
diff --git a/docs/data_node/load_node/clickhouse.md b/docs/data_node/load_node/clickhouse.md
index 3b9b20b925..98ea388e14 100644
--- a/docs/data_node/load_node/clickhouse.md
+++ b/docs/data_node/load_node/clickhouse.md
@@ -99,6 +99,7 @@ TODO: It will be supported in the future.
 | sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
 | sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
 | sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 ## Data Type Mapping
 
diff --git a/docs/data_node/load_node/elasticsearch.md b/docs/data_node/load_node/elasticsearch.md
index 635462c8bc..5fc9ec3c64 100644
--- a/docs/data_node/load_node/elasticsearch.md
+++ b/docs/data_node/load_node/elasticsearch.md
@@ -58,7 +58,7 @@ CREATE TABLE myUserTable (
   pv BIGINT,
   PRIMARY KEY (user_id) NOT ENFORCED
 ) WITH (
-  'connector' = 'elasticsearch-7',
+  'connector' = 'elasticsearch-7-inlong',
   'hosts' = 'http://localhost:9200',
   'index' = 'users'
 );
@@ -92,8 +92,8 @@ TODO: It will be supported in the future.
       <td>String</td>
       <td>Specify what connector to use, valid values are:
       <ul>
-      <li><code>elasticsearch-6</code>: connect to Elasticsearch 6.x cluster.</li>
-      <li><code>elasticsearch-7</code>: connect to Elasticsearch 7.x and later versions cluster.</li>
+      <li><code>elasticsearch-6-inlong</code>: connect to Elasticsearch 6.x cluster.</li>
+      <li><code>elasticsearch-7-inlong</code>: connect to Elasticsearch 7.x and later versions cluster.</li>
       </ul></td>
     </tr>
     <tr>
@@ -242,6 +242,13 @@ TODO: It will be supported in the future.
        By default uses built-in <code>'json'</code> format. Please refer to <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/">JSON Format</a> page for more details.
       </td>
     </tr>
+    <tr>
+      <td>inlong.metric</td>
+      <td>optional</td>
+      <td style={{wordWrap: 'break-word'}}>(none)</td>
+      <td>String</td>
+      <td>Inlong metric label, format of value is groupId&streamId&nodeId.</td> 
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/data_node/load_node/greenplum.md b/docs/data_node/load_node/greenplum.md
index f53c82d1d2..29778b0fa6 100644
--- a/docs/data_node/load_node/greenplum.md
+++ b/docs/data_node/load_node/greenplum.md
@@ -97,6 +97,7 @@ TODO: It will be supported in the future.
 | sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
 | sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
 | sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 ## Data Type Mapping
 
diff --git a/docs/data_node/load_node/hbase.md b/docs/data_node/load_node/hbase.md
index 9845283069..68ff8f4e79 100644
--- a/docs/data_node/load_node/hbase.md
+++ b/docs/data_node/load_node/hbase.md
@@ -51,7 +51,7 @@ CREATE TABLE hbase_load_node (
     family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
     PRIMARY KEY (rowkey) NOT ENFORCED
 ) WITH (
-      'connector' = 'hbase-2.2',
+      'connector' = 'hbase-2.2-inlong',
       'table-name' = 'mytable',
       'zookeeper.quorum' = 'localhost:2181'
 );
@@ -82,7 +82,7 @@ TODO: It will be supported in the future.
 
 | Option | Required | Default | Type | Description |
 |---------|----------|---------|------|------------|
-| connector | required | (none) | String | Specify what connector to use, valid values are: hbase-2.2: connect to HBase 2.2.x cluster |
+| connector | required | (none) | String | Specify what connector to use, valid values are: hbase-2.2-inlong: connect to HBase 2.2.x cluster |
 | table-name | required | (none) | String | The name of HBase table to connect. |
 | zookeeper.quorum | required | (none) | String | The HBase Zookeeper quorum. |
 | zookeeper.znode.parent | optional | /hbase | String | The root dir in Zookeeper for HBase cluster. |
@@ -96,6 +96,7 @@ TODO: It will be supported in the future.
 | lookup.cache.ttl | optional | (none) | Duration | The max time to live for each rows in lookup cache, over this time, the oldest rows will be expired. Note, "cache.max-rows" and "cache.ttl" options must all be specified if any of them is specified.Lookup cache is disabled by default. |
 | lookup.max-retries | optional | 3 | Integer | The max retry times if lookup database failed. |
 | properties.* | optional | (none) | String | This can set and pass arbitrary HBase configurations. Suffix names must match the configuration key defined in [HBase Configuration documentation](https://hbase.apache.org/2.3/book.html#hbase_default_configurations). Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying HBaseClient. For example, you can add a kerberos authentication parameter 'properties.hbase.security.authentication' = 'kerb [...]
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 ## Data Type Mapping
 
diff --git a/docs/data_node/load_node/hdfs.md b/docs/data_node/load_node/hdfs.md
index 72ce1f4d7e..68f1ee9f64 100644
--- a/docs/data_node/load_node/hdfs.md
+++ b/docs/data_node/load_node/hdfs.md
@@ -24,7 +24,7 @@ CREATE TABLE hdfs_load_node (
   dt STRING,
  `hour` STRING
   ) PARTITIONED BY (dt, `hour`) WITH (
-    'connector'='filesystem',
+    'connector'='filesystem-inlong',
     'path'='...',
     'format'='orc',
     'sink.partition-commit.delay'='1 h',
@@ -106,6 +106,13 @@ The file sink supports file compactions, which allows applications to have small
       <td>String</td>
       <td>The compaction target file size, the default value is the rolling file size.</td>
     </tr>
+    <tr>
+      <td>inlong.metric</td>
+      <td>optional</td>
+      <td style={{wordWrap: 'break-word'}}>(none)</td>
+      <td>String</td>
+      <td>Inlong metric label, format of value is groupId&streamId&nodeId.</td> 
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/data_node/load_node/hive.md b/docs/data_node/load_node/hive.md
index 5cd71131c0..fc9261c5b8 100644
--- a/docs/data_node/load_node/hive.md
+++ b/docs/data_node/load_node/hive.md
@@ -129,6 +129,13 @@ TODO: It will be supported in the future.
       custom: use policy class to create a commit policy.
       Support to configure multiple policies: 'metastore,success-file'.</td>
     </tr>
+    <tr>
+      <td>inlong.metric</td>
+      <td>optional</td>
+      <td style={{wordWrap: 'break-word'}}>(none)</td>
+      <td>String</td>
+      <td>Inlong metric label, format of value is groupId&streamId&nodeId.</td> 
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/data_node/load_node/iceberg.md b/docs/data_node/load_node/iceberg.md
index 80cf8d09d8..a839ce2021 100644
--- a/docs/data_node/load_node/iceberg.md
+++ b/docs/data_node/load_node/iceberg.md
@@ -162,6 +162,7 @@ TODO
 | clients          | optional for hive catalog                   | 2       | Integer | The Hive metastore client pool size, default value is 2.     |
 | warehouse        | optional for hadoop catalog or hive catalog | (none)  | String  | For Hive catalog,is the Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath. For hadoop catalog,The HDFS directory to store metadata files and data files. |
 | hive-conf-dir    | optional for hive catalog                   | (none)  | String  | Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwrote with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog. |
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 ## Data Type Mapping
 
diff --git a/docs/data_node/load_node/kafka.md b/docs/data_node/load_node/kafka.md
index a3891cd0f0..04f16d1891 100644
--- a/docs/data_node/load_node/kafka.md
+++ b/docs/data_node/load_node/kafka.md
@@ -60,7 +60,7 @@ Flink SQL> CREATE TABLE kafka_load_node (
           `name` STRINTG,
            PRIMARY KEY (`id`) NOT ENFORCED
           ) WITH (
-          'connector' = 'upsert-kafka',
+          'connector' = 'upsert-kafka-inlong',
           'topic' = 'user',
           'properties.bootstrap.servers' = 'localhost:9092',
           'key.format' = 'csv',
@@ -81,7 +81,7 @@ TODO: It will be supported in the future.
 
 | Option | Required | Default | Type | Description |
 |---------|----------|---------|------|------------|
-| connector | required | (none) | String | Specify which connector to use, valid values are:  1. for the Upsert Kafka use: `upsert-kafka`  2. for normal Kafka use: `kafka-inlong` |
+| connector | required | (none) | String | Specify which connector to use, valid values are:  1. for the Upsert Kafka use: `upsert-kafka-inlong`  2. for normal Kafka use: `kafka-inlong` |
 | topic | required | (none) | String | Topic name(s) to read data from when the table is used as source. It also supports  topic list for source by separating topic by semicolon like `topic-1;topic-2`. Note, only one of `topic-pattern` and `topic` can be specified for sources. |
 | properties.bootstrap.servers | required | (none) | String | Comma separated list of Kafka brokers. |
 | properties.* | optional | (none) | String | This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in [Kafka Configuration documentation](https://kafka.apache.org/documentation/#configuration). Flink will remove the `properties.` key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via `properties.allow.auto.create.topics` = `false`. But there are some [...]
@@ -94,6 +94,7 @@ TODO: It will be supported in the future.
 | sink.partitioner | optional | 'default' | String | Output partitioning from Flink's partitions into Kafka's partitions. Valid values are <br/>`default`: use the kafka default partitioner to partition records. <br/>`fixed`: each Flink partition ends up in at most one Kafka partition. <br/>`round-robin`: a Flink partition is distributed to Kafka partitions sticky round-robin. It only works when record's keys are not specified. Custom FlinkKafkaPartitioner subclass: e.g. 'org.mycompany.My [...]
 | sink.semantic | optional | at-least-once | String | Defines the delivery semantic for the Kafka sink. Valid enumerationns are 'at-least-once', 'exactly-once' and 'none'. See [Consistency guarantees](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#consistency-guarantees) for more details. |
 | sink.parallelism | optional | (none) | Integer | Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 ## Available Metadata
 
diff --git a/docs/data_node/load_node/mysql.md b/docs/data_node/load_node/mysql.md
index 6279780514..a581ec3d8d 100644
--- a/docs/data_node/load_node/mysql.md
+++ b/docs/data_node/load_node/mysql.md
@@ -97,6 +97,7 @@ TODO: It will be supported in the future.
 | sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
 | sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
 | sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 ## Data Type Mapping
 
diff --git a/docs/data_node/load_node/oracle.md b/docs/data_node/load_node/oracle.md
index 37b9cb5db6..a2f98884e3 100644
--- a/docs/data_node/load_node/oracle.md
+++ b/docs/data_node/load_node/oracle.md
@@ -97,6 +97,7 @@ TODO: It will be supported in the future.
 | sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
 | sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
 | sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 ## Data Type Mapping
 
diff --git a/docs/data_node/load_node/postgresql.md b/docs/data_node/load_node/postgresql.md
index dd92041cee..c987c82c33 100644
--- a/docs/data_node/load_node/postgresql.md
+++ b/docs/data_node/load_node/postgresql.md
@@ -59,7 +59,8 @@ CREATE TABLE `postgresql_load_table`(
   `name` STRING,
   `age` INT
 ) WITH (
-  'connector' = 'jdbc',
+  'connector' = 'jdbc-inlong',
+  'dialect-impl' = 'org.apache.inlong.sort.jdbc.dialect.PostgresDialect',
   'url' = 'jdbc:postgresql://localhost:5432/write',
   'username' = 'inlong',
   'password' = 'inlong',
@@ -84,7 +85,7 @@ TODO: It will be supported in the future.
 
 | Option | Required | Default | Type | Description |
 |---------|----------|---------|------|------------|
-| connector | required | (none) | String | Specify what connector to use, here should be 'jdbc'. |
+| connector | required | (none) | String | Specify what connector to use, here should be 'jdbc-inlong'. |
 | url | required | (none) | String | The JDBC database url. |
 | table-name | required | (none) | String | The name of JDBC table to connect. |
 | driver | optional | (none) | String | The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. |
@@ -95,6 +96,7 @@ TODO: It will be supported in the future.
 | sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
 | sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
 | sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 ## Data Type Mapping
 
diff --git a/docs/data_node/load_node/sqlserver.md b/docs/data_node/load_node/sqlserver.md
index 40c8726054..3475ca9c35 100644
--- a/docs/data_node/load_node/sqlserver.md
+++ b/docs/data_node/load_node/sqlserver.md
@@ -95,6 +95,7 @@ TODO: It will be supported in the future.
 | sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
 | sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
 | sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 ## Data Type Mapping
 
diff --git a/docs/data_node/load_node/tdsql-postgresql.md b/docs/data_node/load_node/tdsql-postgresql.md
index cf34ab5dc0..40d0eff8c8 100644
--- a/docs/data_node/load_node/tdsql-postgresql.md
+++ b/docs/data_node/load_node/tdsql-postgresql.md
@@ -95,6 +95,7 @@ TODO: It will be supported in the future.
 | sink.buffer-flush.interval | optional | 1s | Duration | The flush interval mills, over this time, asynchronous threads will flush data. Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | |
 | sink.max-retries | optional | 3 | Integer | The max retry times if writing records to database failed. |
 | sink.parallelism | optional | (none) | Integer | Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
+| inlong.metric | optional | (none) | String | Inlong metric label, format of value is groupId&streamId&nodeId. |
 
 ## Data Type Mapping
 
diff --git a/docs/modules/sort/metrics.md b/docs/modules/sort/metrics.md
new file mode 100644
index 0000000000..d15ced5d1d
--- /dev/null
+++ b/docs/modules/sort/metrics.md
@@ -0,0 +1,100 @@
+---
+title: Monitor Metrics
+sidebar_position: 4
+---
+
+## Overview
+
+We add metric computing for node. Sort will compute metric when user just need add with option `inlong.metric` that includes `groupId&streamId&nodeId`.
+Sort will export metric by flink metric group, So user can use [metric reporter](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/deployment/metric_reporters/) to get metric data.
+
+## Metric
+
+### supporting extract node
+
+| metric name | extract node | description |
+|-------------|--------------|-------------|
+| groupId_streamId_nodeId_numRecordsIn | kafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc | input records number |
+| groupId_streamId_nodeId_numBytesIn | kafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc | input bytes number |
+| groupId_streamId_nodeId_numRecordsInPerSecond | kafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc | input records per second |
+| groupId_streamId_nodeId_numBytesInPerSecond | kafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc | input bytes number per second |
+
+### supporting load node
+
+| metric name | load node | description |
+|-------------|-----------|-------------|
+| groupId_streamId_nodeId_numRecordsOut | clickhouse,elasticsearch,greenplum,hbase,<br/>hdfs,hive,iceberg,kafka,mysql,<br/>oracle,postgresql,sqlserver,tdsql-postgresql | out records number |
+| groupId_streamId_nodeId_numBytesOut |  clickhouse,elasticsearch,greenplum,hbase,<br/>hdfs,hive,iceberg,kafka,mysql,<br/>oracle,postgresql,sqlserver,tdsql-postgresql | output byte number |
+| groupId_streamId_nodeId_numRecordsOutPerSecond |  clickhouse,elasticsearch,greenplum,hbase,<br/>hdfs,hive,iceberg,kafka,mysql,<br/>oracle,postgresql,sqlserver,tdsql-postgresql | output records per second |
+| groupId_streamId_nodeId_numBytesOutPerSecond |  clickhouse,elasticsearch,greenplum,hbase,<br/>hdfs,hive,iceberg,kafka,mysql,<br/>oracle,postgresql,sqlserver,tdsql-postgresql | output bytes  per second |
+
+## Usage
+
+One example about sync mysql data to postgresql data. And We will introduce usage of metric.
+
+* use flink sql
+```sql
+
+ create table `table_groupId_streamId_nodeId1`(
+     `id` INT,
+    `name` INT,
+    `age` STRING,
+    PRIMARY KEY(`id`) NOT ENFORCED)
+    WITH (
+        'connector' = 'mysql-cdc-inlong',
+        'hostname' = 'xxxx',
+        'username' = 'xxx',
+        'password' = 'xxx',
+        'database-name' = 'test',
+        'scan.incremental.snapshot.enabled' = 'true',
+        'server-time-zone' = 'GMT+8',
+        'table-name' = 'user',
+        'inlong.metric' = 'mysqlGroup&mysqlStream&mysqlNode1'
+);
+
+ CREATE TABLE `table_groupId_streamId_nodeId2`(
+     PRIMARY KEY (`id`) NOT ENFORCED,
+     `id` INT,
+     `name` STRING,
+     `age` INT)
+     WITH (
+         'connector' = 'jdbc-inlong',
+         'url' = 'jdbc:postgresql://ip:5432/postgres',
+         'username' = 'postgres',
+         'password' = 'inlong',
+         'table-name' = 'public.user',
+         'inlong.metric' = 'pggroup&pgStream&pgNode'
+         );
+
+ INSERT INTO `table_groupId_streamId_nodeId2`
+ SELECT
+     `id`,
+     `name`,
+     `age`
+ FROM `table_groupId_streamId_nodeId1`;
+```
+
+* We can add metric report in flink-conf.yaml
+
+```yaml
+metric.reporters: promgateway
+metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
+metrics.reporter.promgateway.host: ip
+metrics.reporter.promgateway.port: 9091
+metrics.reporter.promgateway.interval: 60 SECONDS
+```
+`ip` and `port` is your [pushgateway](https://github.com/prometheus/pushgateway/releases) setting. 
+
+* We can visit http://ip:port of pushgateway after execute flink sql.
+Metric name will add prefix `flink_taskmanager_job_task_operator` when metric report is `org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter`.  
+We can see full metric name:  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsIn`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesIn`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsInPerSecond`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesInPerSecond`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOut`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOut`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOutPerSecond`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOutPerSecond`.
+
+
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/kafka.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/kafka.md
index 2acd5dcb85..af259afc3f 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/kafka.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/kafka.md
@@ -68,7 +68,7 @@ Flink SQL> CREATE TABLE kafka_extract_node (
           `name` STRINTG,
            PRIMARY KEY (`id`) NOT ENFORCED
           ) WITH (
-          'connector' = 'upsert-kafka',
+          'connector' = 'upsert-kafka-inlong',
           'topic' = 'user',
           'properties.bootstrap.servers' = 'localhost:9092',
           'properties.group.id' = 'testGroup',
@@ -92,7 +92,7 @@ TODO: 将在未来支持此功能。
 
 | 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
 |---------|----------|---------|------|------------|
-| connector | 必选 | (none) | String | 指定要使用的连接器  1. Upsert Kafka 连接器使用: `upsert-kafka`  2. Kafka连接器使用: `kafka-inlong` |
+| connector | 必选 | (none) | String | 指定要使用的连接器  1. Upsert Kafka 连接器使用: `upsert-kafka-inlong`  2. Kafka连接器使用: `kafka-inlong` |
 | topic | 可选 | (none) | String | 当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 `topic-1;topic-2`。注意,对 source 表而言,`topic` 和 `topic-pattern` 两个选项只能使用其中一个。 |
 | topic-pattern | 可选 | (none) | String | 匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,`topic` 和 `topic-pattern` 两个选项只能使用其中一个。 |
 | properties.bootstrap.servers | 必选 | (none) | String | 逗号分隔的 Kafka broker 列表。 |
@@ -108,6 +108,7 @@ TODO: 将在未来支持此功能。
 | scan.startup.specific-offsets | 可选 | (none) | String | 在使用 'specific-offsets' 启动模式时为每个 partition 指定 offset,例如 'partition:0,offset:42;partition:1,offset:300'。 |
 | scan.startup.timestamp-millis | 可选 | (none) | Long | 在使用 'timestamp' 启动模式时指定启动的时间戳(单位毫秒)。 |
 | scan.topic-partition-discovery.interval | 可选 | (none) | Duration | Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。 |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 | sink.ignore.changelog | 可选 | false | 布尔型 | 支持所有类型的 changelog 流 ingest 到 Kafka。 |
 
 ## 可用的元数据字段
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mongodb-cdc.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mongodb-cdc.md
index b9dbe1fd5b..8a0a36f352 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mongodb-cdc.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mongodb-cdc.md
@@ -90,7 +90,7 @@ Flink SQL> CREATE TABLE mongodb_extract_node (
   suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
   PRIMARY KEY(_id) NOT ENFORCED
 ) WITH (
-  'connector' = 'mongodb-cdc',
+  'connector' = 'mongodb-cdc-inlong',
   'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
   'username' = 'flinkuser',
   'password' = 'flinkpw',
@@ -118,7 +118,7 @@ TODO: 未来会支持
 
 | **选项**                  | **是否必须** | **默认**   | **类型** | **描述**                                                     |
 | ------------------------- | ------------ | ---------- | -------- | ------------------------------------------------------------ |
-| connector                 | 必须         | (none)     | String   | 指定要使用的连接器,这里应该是`mongodb-cdc`.                 |
+| connector                 | 必须         | (none)     | String   | 指定要使用的连接器,这里应该是`mongodb-cdc-inlong`.                 |
 | hosts                     | 必须         | (none)     | String   | MongoDB 服务器的主机名和端口对的逗号分隔列表。例如。`localhost:27017,localhost:27018` |
 | username                  | 可选         | (none)     | String   | 连接到 MongoDB 时要使用的数据库用户的名称。仅当 MongoDB 配置为使用身份验证时才需要这样做。 |
 | password                  | 可选         | (none)     | String   | 连接 MongoDB 时使用的密码。仅当 MongoDB 配置为使用身份验证时才需要这样做。 |
@@ -134,6 +134,7 @@ TODO: 未来会支持
 | poll.max.batch.size       | 可选         | 1000       | Integer  | 轮询新数据时,单个批次中包含的最大更改流文档数。             |
 | poll.await.time.ms        | 可选         | 1500       | Integer  | 在更改流上检查新结果之前等待的时间量。                       |
 | heartbeat.interval.ms     | 可选         | 0          | Integer  | 发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。    |
+| inlong.metric             | 可选         | (none)     | String   | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 ## 可用元数据
 
@@ -160,7 +161,7 @@ CREATE TABLE `mysql_extract_node` (
     suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
     PRIMARY KEY(_id) NOT ENFORCED
 ) WITH (
-      'connector' = 'mongodb-cdc', 
+      'connector' = 'mongodb-cdc-inlong', 
       'hostname' = 'YourHostname',
       'username' = 'YourUsername',
       'password' = 'YourPassword',
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md
index e9dcb9e169..46b6e03b7a 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/mysql-cdc.md
@@ -300,6 +300,13 @@ TODO: 将在未来支持此功能。
           例如:<code>'debezium.snapshot.mode' = 'never'</code>。
           详细了解 <a href="https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties">Debezium 的 MySQL 连接器属性。</a></td> 
     </tr>
+    <tr>
+      <td>inlong.metric</td>
+      <td>可选</td>
+      <td style={{wordWrap: 'break-word'}}>(none)</td>
+      <td>String</td>
+      <td>inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。</td> 
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/oracle-cdc.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/oracle-cdc.md
index 15dcfee77c..565f3b4d53 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/oracle-cdc.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/oracle-cdc.md
@@ -201,7 +201,7 @@ Oracle Extract 节点允许从 Oracle 数据库中读取快照数据和增量数
 
 Oracle Extract 节点可以定义如下:
 
-```sql 
+```sql
 -- 创建 an Oracle Extract 节点 'products' in Flink SQL
 Flink SQL> CREATE TABLE products (
      ID INT NOT NULL,
@@ -210,7 +210,7 @@ Flink SQL> CREATE TABLE products (
      WEIGHT DECIMAL(10, 3),
      PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
-     'connector' = 'oracle-cdc',
+     'connector' = 'oracle-cdc-inlong',
      'hostname' = 'localhost',
      'port' = '1521',
      'username' = 'flinkuser',
@@ -252,7 +252,7 @@ TODO: 将在未来支持此功能。
       <td>required</td>
       <td style={{wordWrap: 'break-word'}}>(none)</td>
       <td>String</td>
-      <td>指定要使用的连接器,这里应该是 <code>'oracle-cdc'</code>。</td>
+      <td>指定要使用的连接器,这里应该是 <code>'oracle-cdc-inlong'</code>。</td>
     </tr>
     <tr>
       <td>hostname</td>
@@ -322,6 +322,13 @@ Oracle CDC 消费者的可选启动模式,有效枚举为"initial"
           例如:<code>'debezium.snapshot.mode' = 'never'</code>。
           详细了解 <a href="https://debezium.io/documentation/reference/1.5/connectors/oracle.html#oracle-connector-properties">Debezium 的 Oracle 连接器属性</a></td> 
      </tr>
+     <tr>
+       <td>inlong.metric</td>
+       <td>可选</td>
+       <td style={{wordWrap: 'break-word'}}>(none)</td>
+       <td>String</td>
+       <td>inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。</td> 
+     </tr>
     </tbody>
 </table>    
 </div>
@@ -388,7 +395,7 @@ CREATE TABLE products (
     WEIGHT DECIMAL(10, 3),
     PRIMARY KEY(id) NOT ENFORCED
 ) WITH (
-    'connector' = 'oracle-cdc',
+    'connector' = 'oracle-cdc-inlong',
     'hostname' = 'localhost',
     'port' = '1521',
     'username' = 'flinkuser',
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/postgresql-cdc.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/postgresql-cdc.md
index b561778c57..7ad1f01bcb 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/postgresql-cdc.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/postgresql-cdc.md
@@ -78,7 +78,7 @@ CREATE TABLE `postgresTable`(
   `name` STRING,
   `age` INT
 ) WITH (
-  'connector' = 'postgres-cdc',
+  'connector' = 'postgres-cdc-inlong',
   'hostname' = 'localhost',
   'username' = 'postgres',
   'password' = 'inlong',
@@ -103,7 +103,7 @@ TODO: 将在未来支持此功能。
 
 | 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
 |---------|----------|---------|------|------------|
-| connector | 必选 | (none) | String | 指定使用的连接器,这是设置 `postgres-cdc`.|
+| connector | 必选 | (none) | String | 指定使用的连接器,这是设置 `postgres-cdc-inlong`.|
 | hostname | 必选 | (none) | String | PostgreSQL 数据库的 IP 地址或者主机名 |
 | username | 必选 | (none) | String | 连接到 PostgreSQL 数据库服务器时要使用的 PostgreSQL 数据库的名称。 |
 | password | 必选 | (none) | String |  |
@@ -114,6 +114,7 @@ TODO: 将在未来支持此功能。
 | decoding.plugin.name | 可选 | decoderbufs | String | 服务器上安装的 Postgres 逻辑解码插件的名称。 支持的值是 decoderbufs、wal2json、wal2json_rds、wal2json_streaming、wal2json_rds_streaming 和 pgoutput。 |
 | slot.name | 可选 | flink | String | PostgreSQL 逻辑解码槽的名称,它是为从特定数据库/模式的特定插件流式传输更改而创建的。 服务器使用此插槽将事件流式传输到您正在配置的连接器。 插槽名称必须符合 PostgreSQL 复制插槽命名规则,其中规定:“每个复制插槽都有一个名称,可以包含小写字母、数字和下划线字符。” |
 | debezium.* | 可选 | (none) | String | 将 Debezium 的属性传递给用于从 Postgres 服务器捕获数据更改的 Debezium Embedded Engine。 例如:“debezium.snapshot.mode”=“never”。 查看更多关于 [Debezium 的 Postgres 连接器属性](https://debezium.io/documentation/reference/1.5/connectors/postgresql.html#postgresql-connector-properties)。 |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 **Note**: `slot.name` 建议为不同的表设置以避免潜在的 PSQLException: ERROR: replication slot "flink" is active for PID 974 error。  
 **Note**: PSQLException: ERROR: all replication slots are in use Hint: Free one or increase max_replication_slots. 我们可以通过以下语句删除槽。  
@@ -144,7 +145,7 @@ CREATE TABLE postgresTable (
     `name` STRING,
     `age` INT
 ) WITH (
-     'connector' = 'postgres-cdc',
+     'connector' = 'postgres-cdc-inlong',
      'hostname' = 'localhost',
      'username' = 'postgres',
      'password' = 'inlong',
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/pulsar.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/pulsar.md
index 6b8e6607e3..6cc1450a3a 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/pulsar.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/pulsar.md
@@ -44,7 +44,7 @@ Step.1 准备好 sql 客户端
 
 Step.2 从Pulsar读取数据
 
-```
+```sql
 CREATE TABLE pulsar (
   `physical_1` STRING,
   `physical_2` INT,
@@ -55,7 +55,7 @@ CREATE TABLE pulsar (
   `key` STRING ,
   `physical_3` BOOLEAN
 ) WITH (
-  'connector' = 'pulsar',
+  'connector' = 'pulsar-inlong',
   'topic' = 'persistent://public/default/topic82547611',
   'key.format' = 'raw',
   'key.fields' = 'key',
@@ -106,6 +106,7 @@ TODO
 | key.fields-prefix             | 可选     | (none)        | String | 为 key 格式的所有字段定义自定义前缀,以避免与 value 格式的字段名称冲突。默认情况下,前缀为空。如果定义了自定义前缀,`key.fields`则使用表架构和。 |
 | format or value.format        | 必需     | (none)        | String | 使用前缀设置名称。当以键格式构造数据类型时,前缀被移除,并且在键格式中使用非前缀名称。Pulsar 消息值序列化格式,支持 JSON、Avro 等。更多信息请参见 Flink 格式。 |
 | value.fields-include          | 可选     | ALL           | Enum   | Pulsar 消息值包含字段策略、可选的 ALL 和 EXCEPT_KEY。        |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 ## 可用元数据
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/sqlserver-cdc.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/sqlserver-cdc.md
index 0fd95275c4..d33648a996 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/sqlserver-cdc.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/extract_node/sqlserver-cdc.md
@@ -90,7 +90,7 @@ Flink SQL> CREATE TABLE sqlserver_extract_node (
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
      ) WITH (
-     'connector' = 'sqlserver-cdc',
+     'connector' = 'sqlserver-cdc-inlong',
      'hostname' = 'YourHostname',
      'port' = 'port', --default:1433
      'username' = 'YourUsername',
@@ -127,7 +127,7 @@ TODO
       <td>必须</td>
       <td style={{wordWrap: 'break-word'}}>(none)</td>
       <td>String</td>
-      <td>指定使用什么连接器,这里应该是 'sqlserver-cdc'。</td>
+      <td>指定使用什么连接器,这里应该是 'sqlserver-cdc-inlong'。</td>
     </tr>
     <tr>
       <td>hostname</td>
@@ -185,6 +185,13 @@ TODO
       <td>String</td>
       <td>SQLServer 数据库连接配置时区。 例如: "Asia/Shanghai"。</td>
     </tr>
+    <tr>
+      <td>inlong.metric</td>
+      <td>可选</td>
+      <td style={{wordWrap: 'break-word'}}>(none)</td>
+      <td>String</td>
+      <td>inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。</td> 
+     </tr>
     </tbody>
 </table>
 </div>
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/clickhouse.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/clickhouse.md
index 5490af5723..5aba3a0aab 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/clickhouse.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/clickhouse.md
@@ -97,6 +97,7 @@ TODO: 将在未来支持此功能。
 | sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
 | sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
 | sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 ## 数据类型映射
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/elasticsearch.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/elasticsearch.md
index e73753dc6c..098e86a6b6 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/elasticsearch.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/elasticsearch.md
@@ -238,6 +238,13 @@ TODO: 将在未来支持这个特性。
        默认使用内置的 <code>'json'</code> 格式。更多详细信息,请参阅 <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/overview/">JSON Format</a> 页面。
       </td>
     </tr>
+    <tr>
+      <td>inlong.metric</td>
+      <td>可选</td>
+      <td style={{wordWrap: 'break-word'}}>(none)</td>
+      <td>String</td>
+      <td>inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。</td> 
+     </tr>
     </tbody>
 </table>
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/greenplum.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/greenplum.md
index 4616ea3c77..668ac9c02a 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/greenplum.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/greenplum.md
@@ -95,6 +95,7 @@ TODO: 将在未来支持此功能。
 | sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
 | sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
 | sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 ## 数据类型映射
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hbase.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hbase.md
index e61bc4bc3a..644f2fdcfe 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hbase.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hbase.md
@@ -49,7 +49,7 @@ CREATE TABLE hbase_load_node (
     family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
     PRIMARY KEY (rowkey) NOT ENFORCED
 ) WITH (
-      'connector' = 'hbase-2.2',
+      'connector' = 'hbase-2.2-inlong',
       'table-name' = 'mytable',
       'zookeeper.quorum' = 'localhost:2181'
 );
@@ -80,7 +80,7 @@ TODO: 将在未来支持此功能。
 
 | 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
 |-----|---------|-------|---------|-----|
-| connector | 必选 | (none) | String | 指定使用的连接器: hbase-2.2: 连接 HBase 2.2.x 集群 |
+| connector | 必选 | (none) | String | 指定使用的连接器: hbase-2.2-inlong: 连接 HBase 2.2.x 集群 |
 | table-name | 必选 | (none) | String | 连接的 HBase 表名。 |
 | zookeeper.quorum | 必选 | (none) | String | HBase Zookeeper quorum 信息。 |
 | zookeeper.znode.parent | 可选 | /hbase | String | HBase 集群的 Zookeeper 根目录。|
@@ -94,6 +94,7 @@ TODO: 将在未来支持此功能。
 | lookup.cache.ttl | 可选 | (none) | Duration | 查找缓存中每一行的最大生存时间,在这段时间内,最老的行将过期。注意:"lookup.cache.max-rows" 和 "lookup.cache.ttl" 必须同时被设置。默认情况下,查找缓存是禁用的。 |
 | lookup.max-retries | 可选 | 3 | Integer | 查找数据库失败时的最大重试次数。 |
 | properties.* | 可选 | (none) | String | 可以设置任意 HBase 的配置项。后缀名必须匹配在 [HBase 配置文档](https://hbase.apache.org/2.3/book.html#hbase_default_configurations) 中定义的配置键。Flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 HBase 客户端。 例如您可以设置 'properties.hbase.security.authentication' = 'kerberos' 等kerberos认证参数。 |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 ## 数据类型映射
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hdfs.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hdfs.md
index a3efd26218..7e48fd2896 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hdfs.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hdfs.md
@@ -23,7 +23,7 @@ CREATE TABLE hdfs_load_node (
   dt STRING,
  `hour` STRING
   ) PARTITIONED BY (dt, `hour`) WITH (
-    'connector'='filesystem',
+    'connector'='filesystem-inlong',
     'path'='...',
     'format'='orc',
     'sink.partition-commit.delay'='1 h',
@@ -108,6 +108,13 @@ CREATE TABLE hdfs_load_node (
       <td>String</td>
       <td>合并目标文件大小,默认值为滚动文件大小。</td>
     </tr>
+    <tr>
+      <td>inlong.metric</td>
+      <td>可选</td>
+      <td style={{wordWrap: 'break-word'}}>(none)</td>
+      <td>String</td>
+      <td>inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。</td> 
+    </tr>
     </tbody>
 </table>
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hive.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hive.md
index 6fcd879a59..5f11e74986 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hive.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/hive.md
@@ -127,6 +127,13 @@ TODO: 未来版本支持
       custom:通过指定的类来创建提交策略, 
       支持同时指定多个提交策略:'metastore,success-file'。</td>
     </tr>
+    <tr>
+      <td>inlong.metric</td>
+      <td>可选</td>
+      <td style={{wordWrap: 'break-word'}}>(none)</td>
+      <td>String</td>
+      <td>inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。</td> 
+     </tr>
     </tbody>
 </table>
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/iceberg.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/iceberg.md
index 3f932c631a..930605b239 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/iceberg.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/iceberg.md
@@ -163,6 +163,7 @@ TODO
 | clients          | hive catalog可选                 | 2      | Integer | Hive Metastore 客户端池大小,默认值为 2                      |
 | warehouse        | hive catalog或hadoop catalog可选 | (none) | String  | 对于 Hive 目录,是 Hive 仓库位置,如果既不设置`hive-conf-dir`指定包含`hive-site.xml`配置文件的位置也不添加正确`hive-site.xml`的类路径,用户应指定此路径。对于hadoop目录,HDFS目录存放元数据文件和数据文件 |
 | hive-conf-dir    | hive catalog可选                 | (none) | String  | `hive-site.xml`包含将用于提供自定义 Hive 配置值的配置文件的目录的路径。如果同时设置和创建Iceberg目录时,`hive.metastore.warehouse.dir`from `<hive-conf-dir>/hive-site.xml`(或来自类路径的 hive 配置文件)的值将被该值覆盖。`warehouse``hive-conf-dir``warehouse` |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 ## 数据类型映射
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
index 04800c5a0b..37ce5f30d3 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
@@ -57,7 +57,7 @@ Flink SQL> CREATE TABLE kafka_load_node (
           `name` STRINTG,
            PRIMARY KEY (`id`) NOT ENFORCED
           ) WITH (
-          'connector' = 'upsert-kafka',
+          'connector' = 'upsert-kafka-inlong',
           'topic' = 'user',
           'properties.bootstrap.servers' = 'localhost:9092',
           'key.format' = 'csv',
@@ -78,7 +78,7 @@ TODO: 将在未来支持此功能。
 
 | 参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
 |---------|----------|---------|------|------------|
-| connector | 必选 | (none) | String | 指定要使用的连接器  1. Upsert Kafka 连接器使用: `upsert-kafka`  2. Kafka连接器使用: `kafka-inlong` |
+| connector | 必选 | (none) | String | 指定要使用的连接器  1. Upsert Kafka 连接器使用: `upsert-kafka-inlong`  2. Kafka连接器使用: `kafka-inlong` |
 | topic | 必选 | (none) | String | 当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 `topic-1;topic-2`。注意,对 source 表而言,`topic` 和 `topic-pattern` 两个选项只能使用其中一个。 |
 | properties.bootstrap.servers | 必选 | (none) | String | 逗号分隔的 Kafka broker 列表。 |
 | properties.* | 可选 | (none) | String | 可以设置和传递任意 Kafka 的配置项。后缀名必须匹配在 [Kafka 配置文档](https://kafka.apache.org/documentation/#configuration) 中定义的配置键。Flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用 topic 的自动创建。但是某些配置项不支持进行配置,因为 Flink 会覆盖这些配置,例如 'key.deserializer' 和 'value.deserializer'。 |
@@ -91,6 +91,7 @@ TODO: 将在未来支持此功能。
 | sink.partitioner | 可选 | 'default' | String | Flink partition 到 Kafka partition 的分区映射关系,可选值有:<br/>default:使用 Kafka 默认的分区器对消息进行分区。<br/>fixed:每个 Flink partition 最终对应最多一个 Kafka partition。<br/>round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。<br/>自定义 FlinkKafkaPartitioner 的子类:例如 'org.mycompany.MyPartitioner'。请参阅 [Sink 分区](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/kafka/#sink-%E5%88%86%E5%8C%BA) 以获取更多细节。 |
 | sink.semantic | 可选 | at-least-once | String | 定义 Kafka sink 的语义。有效值为 'at-least-once','exactly-once' 和 'none'。请参阅 [一致性保证](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/kafka/#%E4%B8%80%E8%87%B4%E6%80%A7%E4%BF%9D%E8%AF%81) 以获取更多细节。 |
 | sink.parallelism | 可选 | (none) | Integer | 定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。 |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 ## 可用的元数据字段
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/mysql.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/mysql.md
index 9047b70f07..b64e43bcbb 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/mysql.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/mysql.md
@@ -95,6 +95,7 @@ TODO: 将在未来支持此功能。
 | sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
 | sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
 | sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 ## 数据类型映射
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/oracle.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/oracle.md
index 450e8b7bc2..98f6db1b4f 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/oracle.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/oracle.md
@@ -94,6 +94,7 @@ TODO: 将在未来支持此功能。
 | sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
 | sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
 | sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 ## 数据类型映射
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/postgresql.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/postgresql.md
index 0e4f54bce5..918d669276 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/postgresql.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/postgresql.md
@@ -58,7 +58,8 @@ CREATE TABLE `postgresql_load_table`(
   `name` STRING,
   `age` INT
 ) WITH (
-  'connector' = 'jdbc',
+  'connector' = 'jdbc-inlong',
+  'dialect-impl' = 'org.apache.inlong.sort.jdbc.dialect.PostgresDialect',
   'url' = 'jdbc:postgresql://localhost:5432/write',
   'username' = 'inlong',
   'password' = 'inlong',
@@ -94,6 +95,7 @@ TODO: 将在未来支持此功能。
 | sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
 | sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
 | sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 ## 数据类型映射
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/sqlserver.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/sqlserver.md
index 21f4424fcb..a317406963 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/sqlserver.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/sqlserver.md
@@ -93,6 +93,7 @@ TODO: 将在未来支持此功能。
 | sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
 | sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
 | sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 ## 数据映射
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/tdsql-postgresql.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/tdsql-postgresql.md
index d558538d1c..583601884d 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/tdsql-postgresql.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/tdsql-postgresql.md
@@ -93,6 +93,7 @@ TODO: 将在未来支持此功能。
 | sink.buffer-flush.interval | 可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
 | sink.max-retries | 可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
 | sink.parallelism | 可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
+| inlong.metric | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId&streamId&nodeId。|
 
 ## 数据类型映射
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/sort/example.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/sort/example.md
index 5800e9835d..20e07b7c68 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/sort/example.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/sort/example.md
@@ -1,5 +1,5 @@
 ---
-title: Example
+title: 例子
 sidebar_position: 3
 ---
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/sort/metrics.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/sort/metrics.md
new file mode 100644
index 0000000000..7f05d6e7cf
--- /dev/null
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/sort/metrics.md
@@ -0,0 +1,101 @@
+---
+title: 监控指标
+sidebar_position: 4
+---
+
+## 概览
+
+我们为节点增加了指标计算。 用户添加 with 选项 `inlong.metric` 后 Sort 会计算指标,`inlong.metric` 选项的值由三部分构成:`groupId&streamId&nodeId`。
+用户可以使用 [metric reporter](https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/deployment/metric_reporters/) 去上报数据。
+
+## 指标
+
+### 支持的 extract 节点
+
+| 指标名 | extract 节点 | 描述 |
+|-------------|--------------|-------------|
+| groupId_streamId_nodeId_numRecordsIn | kafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc | 输入记录数 |
+| groupId_streamId_nodeId_numBytesIn | kafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc | 输入字节数 |
+| groupId_streamId_nodeId_numRecordsInPerSecond | kafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc | 每秒输入记录数 |
+| groupId_streamId_nodeId_numBytesInPerSecond | kafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc | 每秒输入字节数 |
+
+### 支持的 load 节点
+
+| 指标名 | load 节点 | 描述 |
+|-------------|-----------|-------------|
+| groupId_streamId_nodeId_numRecordsOut | clickhouse,elasticsearch,greenplum,hbase,<br/>hdfs,hive,iceberg,kafka,<br/>mysql,oracle,postgresql,sqlserver,tdsql-postgresql | 输出记录数 |
+| groupId_streamId_nodeId_numBytesOut |  clickhouse,elasticsearch,greenplum,hbase,<br/>hdfs,hive,iceberg,kafka,<br/>mysql,oracle,postgresql,sqlserver,tdsql-postgresql | 输出字节数 |
+| groupId_streamId_nodeId_numRecordsOutPerSecond |  clickhouse,elasticsearch,greenplum,<br/>hbase,hdfs,hive,iceberg,<br/>kafka,mysql,oracle,postgresql,sqlserver,tdsql-postgresql | 每秒输出记录数 |
+| groupId_streamId_nodeId_numBytesOutPerSecond |  clickhouse,elasticsearch,greenplum,<br/>hbase,hdfs,hive,iceberg,kafka,<br/>mysql,oracle,postgresql,sqlserver,tdsql-postgresql | 每秒输出字节数 |
+
+## 用法
+
+这里将介绍一个同步MYSQL数据到PostgreSQL的例子,同时介绍指标的使用。
+
+* flink sql 的使用
+```sql
+
+ create table `table_groupId_streamId_nodeId1`(
+     `id` INT,
+    `name` INT,
+    `age` STRING,
+    PRIMARY KEY(`id`) NOT ENFORCED)
+    WITH (
+        'connector' = 'mysql-cdc-inlong',
+        'hostname' = 'xxxx',
+        'username' = 'xxx',
+        'password' = 'xxx',
+        'database-name' = 'test',
+        'scan.incremental.snapshot.enabled' = 'true',
+        'server-time-zone' = 'GMT+8',
+        'table-name' = 'user',
+        'inlong.metric' = 'mysqlGroup&mysqlStream&mysqlNode1'
+);
+
+ CREATE TABLE `table_groupId_streamId_nodeId2`(
+     PRIMARY KEY (`id`) NOT ENFORCED,
+     `id` INT,
+     `name` STRING,
+     `age` INT)
+     WITH (
+         'connector' = 'jdbc-inlong',
+         'url' = 'jdbc:postgresql://ip:5432/postgres',
+         'username' = 'postgres',
+         'password' = 'inlong',
+         'table-name' = 'public.user',
+         'inlong.metric' = 'pggroup&pgStream&pgNode'
+         );
+
+ INSERT INTO `table_groupId_streamId_nodeId2`
+ SELECT
+     `id`,
+     `name`,
+     `age`
+ FROM `table_groupId_streamId_nodeId1`;
+```
+
+* 我们可以在flink-conf.yaml中添加metric report配置
+
+```yaml
+metric.reporters: promgateway
+metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
+metrics.reporter.promgateway.host: ip
+metrics.reporter.promgateway.port: 9091
+metrics.reporter.promgateway.interval: 60 SECONDS
+```
+`ip` 和 `port` 是你的 [pushgateway](https://github.com/prometheus/pushgateway/releases) 的配置。
+
+* 执行上面的sql后,我们可以访问 pushgateway 的 url: http://ip:port
+
+当我们使用的 metric report 是 `org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter` 指标名将添加前缀 `flink_taskmanager_job_task_operator`。  
+我们可以看到完整的指标名如下:    
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsIn`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesIn`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsInPerSecond`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesInPerSecond`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOut`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOut`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOutPerSecond`,  
+ `flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOutPerSecond`.
+
+