You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/06/13 02:52:18 UTC

[GitHub] [incubator-inlong-website] yunqingmoswu commented on a diff in pull request #392: [INLONG-391][Sort] Add some data node detail doc

yunqingmoswu commented on code in PR #392:
URL: https://github.com/apache/incubator-inlong-website/pull/392#discussion_r895282128


##########
docs/data_node/extract_node/kafka.md:
##########
@@ -3,6 +3,157 @@ title: Kafka
 sidebar_position: 4
 ---
 
-## Configuration
-the Dashboard has not supported extracting data from Kafka for this version, 
-you can create Kafka data streams from the background via the [Command-line Tools](user_guide/command_line_tools.md).
\ No newline at end of file
+## Kafka Extract Node
+
+The `Kafka Extract Node` supports to read data from Kafka topics. It can support read data in the normal fashion and read data in the
+upsert fashion. The `upsert-kafka` connector produces a `changelog stream`, where each data record represents an `update` or 
+`delete` event. The `kafka-inlong` connector can read data and metadata.  
+
+## Supported Version
+
+| Extract Node                | Kafka version |                                                                                                                                                                                                                                                                                                                                                                                           
+|-----------------------------|---------------|
+| [Kafka](./kafka.md)         | universal     |  
+
+## Dependencies  
+
+In order to set up the `Kafka Extract Node`, the following provides dependency information for both projects using a
+build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.
+
+### Maven dependency
+
+```xml
+<dependency>
+    <groupId>org.apache.inlong</groupId>
+    <artifactId>sort-connector-kafka</artifactId>
+    <!-- Choose the version that suits your application -->
+    <version>inlong_version</version>
+</dependency>
+```
+
+## How to create a Kafka Extract Node
+
+### Usage for SQL API
+
+The example below shows how to create a Kafka Extract Node with `Flink SQL` :
+* connector is `kafka-inlong`
+```sql
+-- Set checkpoint every 3000 milliseconds                       
+Flink SQL> SET 'execution.checkpointing.interval' = '3s';   
+
+-- Create a Kafka table 'kafka_extract_node' in Flink SQL
+Flink SQL> CREATE TABLE kafka_extract_node (
+           `id` INT,
+           `name` STRINTG
+           ) WITH (
+           'connector' = 'kafka-inlong',
+           'topic' = 'user',
+           'properties.bootstrap.servers' = 'localhost:9092',
+           'properties.group.id' = 'testGroup',
+           'scan.startup.mode' = 'earliest-offset',
+           'format' = 'csv'
+           )
+  
+-- Read data
+Flink SQL> SELECT * FROM kafka_extract_node;
+```
+* connector is `upsert-kafka`
+```sql
+-- Set checkpoint every 3000 milliseconds                       
+Flink SQL> SET 'execution.checkpointing.interval' = '3s';
+
+-- Create a Kafka table 'kafka_extract_node' in Flink SQL
+Flink SQL> CREATE TABLE kafka_extract_node (
+          `id` INT,
+          `name` STRINTG,
+           PRIMARY KEY (`id`) NOT ENFORCED
+          ) WITH (
+          'connector' = 'upsert-kafka',
+          'topic' = 'user',
+          'properties.bootstrap.servers' = 'localhost:9092',
+          'properties.group.id' = 'testGroup',
+          'scan.startup.mode' = 'earliest-offset',
+          'key.format' = 'csv',
+          'value.format' = 'csv'
+          )
+    
+-- Read data
+Flink SQL> SELECT * FROM kafka_extract_node;       
+```
+### Usage for InLong Dashboard
+
+TODO: It will be supported in the future.
+
+### Usage for InLong Manager Client
+
+TODO: It will be supported in the future.
+
+## Kafka Extract Node Options
+
+| Option | Required | Default | Type | Description |
+|---------|----------|---------|------|------------|
+| connector | required | (none) | String | Specify which connector to use, valid values are:  1. for the Upsert Kafka use: `upsert-kafka'`  2. for normal Kafka use: `kafka-inlong` |
+| topic | optional | (none) | String | Topic name(s) to read data from when the table is used as source. It also supports  topic list for source by separating topic by semicolon like `topic-1;topic-2`. Note, only one of `topic-pattern` and `topic` can be specified for sources. |
+| topic-pattern | optional | (none) | String | The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of `topic-pattern` and `topic` can be specified for sources. |
+| properties.bootstrap.servers | required | (none) | String | Comma separated list of Kafka brokers. |
+| properties.group.id | required | (none) | String | The id of the consumer group for Kafka source. |
+| properties.* | optional | (none) | String | This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the `properties.` key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via `properties.allow.auto.create.topics` = `false`. But there are some configurations that do not support to set, because Flink will override them, e.g. `key.deserializer` and `value.deserializer`. |
+| format | required for normal kafka | (none) | String | The format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more [format](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/) options. Note: Either this option or the `value.format` option are required. |
+| key.format | optional | (none) | String | The format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the 'key.fields' option is required as well. Otherwise the Kafka records will have an empty key. |
+| key.fields | optional | [] | `List<String>` | Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like 'field1;field2'. |
+| key.fields-prefix | optional | (none) | String | Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and 'key.fields' will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that 'value.fields-include' must be set to 'EXCEPT_KEY'. |
+| value.fields-include | optional | ALL | Enum Possible values: [ALL, EXCEPT_KEY]| Defines a strategy how to deal with key columns in the data type of the value format. By default, 'ALL' physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format |
+| scan.startup.mode | optional | group-offsets | String | Startup mode for Kafka consumer, valid values are 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'. See the following Start Reading Position for more details. |
+| scan.startup.specific-offsets | optional | (none) | String | Specify offsets for each partition in case of 'specific-offsets' startup mode, e.g. 'partition:0,offset:42;partition:1,offset:300'. |
+| scan.startup.timestamp-millis | optional | (none) | Long | Start from the specified epoch timestamp (milliseconds) used in case of 'timestamp' startup mode. |
+| scan.topic-partition-discovery.interval | optional | (none) | Duration | Interval for consumer to discover dynamically created Kafka topics and partitions periodically. |
+
+## Available Metadata
+
+The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition. It supports read metadata for format `canal-json-inlong`.
+
+| key | Data Type | Description  | 
+|-----|------------|-------------|
+| value.table_name | STRING | Name of the table that contain the row  | 
+| value.database_name | STRING |  Name of the database that contain the row  |
+| value.op_ts| TIMESTAMP(3) | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0 |
+| value.op_type| STRING | Operation type, INSERT/UPDATE/DELETE |
+| value.batch_id| BIGINT | Not important, a simple increment counter |
+| value.is_ddl| BOOLEAN | Source does not emit ddl data, value is false |
+| value.update_before| `ARRAY<MAP<STRING, STRING>>` | The update-before data for UPDATE record |
+| value.mysql_type | MAP<STRING, STRING> | MySQL field type |
+| value.pk_names | `ARRAY<STRING>` | Primary key |
+| value.sql_type | MAP<STRING, INT> | SQL field type |
+| value.ts | TIMESTAMP_LTZ(3) | The ts_ms field is used to store the information about the local time at which the connector processed/generated the event |
+
+The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
+
+```sql
+CREATE TABLE `kafka_extract_node` (
+      `id` INT,
+      `name` STRING,
+      `database_name` string METADATA FROM 'value.database_name',
+      `table_name`    string METADATA FROM 'value.table_name',
+      `op_ts`         timestamp(3) METADATA FROM 'value.op_ts',
+      `op_type` string METADATA FROM 'value.op_type',
+      `batch_id` bigint METADATA FROM 'value.batch_id',
+      `is_ddl` boolean METADATA FROM 'value.is_ddl',
+      `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update_before',
+      `mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql_type',
+      `pk_names` ARRAY<STRING> METADATA FROM 'value.pk_names',
+      `data` STRING METADATA FROM 'value.data',
+      `sql_type` MAP<STRING, INT> METADATA FROM 'value.sql_type',
+      `ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ts',
+) WITH (
+      'connector' = 'kafka-inlong',
+      'topic' = 'user',
+      'properties.bootstrap.servers' = 'localhost:9092',
+      'properties.group.id' = 'testGroup',
+      'scan.startup.mode' = 'earliest-offset',
+      'format' = 'canal-json-inlong'
+)
+```
+
+## Data Type Mapping
+
+Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to [Formats](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/overview/) pages for more details.

Review Comment:
   Is it better to copy a copy from Flink here?



##########
docs/data_node/extract_node/mysql-cdc.md:
##########
@@ -14,7 +14,7 @@ The MySQL Extract Node allows for reading snapshot data and incremental data fro
 
 ## Dependencies
 
-In order to setup the MySQL Extract Node, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.

Review Comment:
   Why it is changed to `set up`?



##########
docs/data_node/extract_node/kafka.md:
##########
@@ -3,6 +3,157 @@ title: Kafka
 sidebar_position: 4
 ---
 
-## Configuration
-the Dashboard has not supported extracting data from Kafka for this version, 
-you can create Kafka data streams from the background via the [Command-line Tools](user_guide/command_line_tools.md).
\ No newline at end of file
+## Kafka Extract Node
+
+The `Kafka Extract Node` supports to read data from Kafka topics. It can support read data in the normal fashion and read data in the
+upsert fashion. The `upsert-kafka` connector produces a `changelog stream`, where each data record represents an `update` or 
+`delete` event. The `kafka-inlong` connector can read data and metadata.  
+
+## Supported Version
+
+| Extract Node                | Kafka version |                                                                                                                                                                                                                                                                                                                                                                                           
+|-----------------------------|---------------|
+| [Kafka](./kafka.md)         | universal     |  

Review Comment:
   The kafka version  is best to be consistent with the overview.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org