You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zy...@apache.org on 2023/06/09 03:52:28 UTC

[doris] branch master updated: [Feature](load)RoutineLoad support multi table load (#20307)

This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 019e2353d3 [Feature](load)RoutineLoad support multi table load (#20307)
019e2353d3 is described below

commit 019e2353d3fd1d4131d19df62844bd08194e5c09
Author: Calvin Kirs <ac...@163.com>
AuthorDate: Fri Jun 9 11:52:20 2023 +0800

    [Feature](load)RoutineLoad support multi table load (#20307)
    
    1. Support mutli table for routine load
    2. Multi-table dynamic setting table information
    3. Add multi-table syntax rules
    4. Add new multi-table execution plan
---
 .../import/import-way/routine-load-manual.md       | 142 ++++++++++++++----
 .../Load/CREATE-ROUTINE-LOAD.md                    |  67 +++++++--
 .../Show-Statements/SHOW-ROUTINE-LOAD.md           |   3 +-
 .../import/import-way/routine-load-manual.md       |  89 ++++++++++-
 .../Load/CREATE-ROUTINE-LOAD.md                    |  66 ++++++--
 .../Show-Statements/SHOW-ROUTINE-LOAD.md           |   3 +-
 fe/fe-core/src/main/cup/sql_parser.cup             |   7 +-
 .../doris/analysis/AlterRoutineLoadStmt.java       |   2 +-
 .../doris/analysis/CreateRoutineLoadStmt.java      |  18 ++-
 .../apache/doris/analysis/ShowRoutineLoadStmt.java |   1 +
 .../routineload/AbstractDataSourceProperties.java  |  13 +-
 .../load/routineload/KafkaRoutineLoadJob.java      |  35 +++--
 .../doris/load/routineload/KafkaTaskInfo.java      |  24 +--
 .../RoutineLoadDataSourcePropertyFactory.java      |  18 ++-
 .../doris/load/routineload/RoutineLoadJob.java     | 116 +++++++++-----
 .../doris/load/routineload/RoutineLoadManager.java |  41 ++---
 .../load/routineload/RoutineLoadTaskInfo.java      |  10 +-
 .../load/routineload/RoutineLoadTaskScheduler.java |   1 -
 .../load/routineload/kafka/KafkaConfigType.java    |  32 ++++
 .../load/routineload/kafka/KafkaConfiguration.java |  11 +-
 .../kafka/KafkaDataSourceProperties.java           |  53 ++++++-
 .../apache/doris/planner/StreamLoadPlanner.java    |   8 +-
 .../java/org/apache/doris/qe/ShowExecutor.java     |  24 ++-
 .../apache/doris/service/FrontendServiceImpl.java  | 167 +++++++++++++++++++--
 .../doris/transaction/DatabaseTransactionMgr.java  |  38 ++++-
 .../apache/doris/transaction/TransactionState.java |  18 +--
 .../doris/analysis/CreateRoutineLoadStmtTest.java  |  68 +++++++++
 .../load/routineload/KafkaRoutineLoadJobTest.java  |   2 +-
 .../routineload/RoutineLoadTaskSchedulerTest.java  |   2 +-
 .../transaction/GlobalTransactionMgrTest.java      |   4 +-
 30 files changed, 904 insertions(+), 179 deletions(-)

diff --git a/docs/en/docs/data-operate/import/import-way/routine-load-manual.md b/docs/en/docs/data-operate/import/import-way/routine-load-manual.md
index 1c5fd2536a..7d304fae80 100644
--- a/docs/en/docs/data-operate/import/import-way/routine-load-manual.md
+++ b/docs/en/docs/data-operate/import/import-way/routine-load-manual.md
@@ -142,40 +142,126 @@ CREATE ROUTINE LOAD example_db.test1 ON example_tbl
 >
 >"strict_mode" = "true"
 
-3. Example of importing data in Json format
+[3. Example of importing data in Json format](#Example_of_importing_data_in_Json_format)
 
-   Routine Load only supports the following two types of json formats
+  The json format imported by Routine Load only supports the following two types:
 
-   The first one has only one record and is a json object.
+- Only one record and it is a json object:
 
-```json
-{"category":"a9jadhx","author":"test","price":895}
+When using **single table import** (that is, specifying the table name through ON TABLE_NAME), the json data format is as follows.
+  ```
+  {"category":"a9jadhx","author":"test","price":895}
+  ```
+When using **dynamic/multi-table import**  (i.e. not specifying a specific table name), the JSON data format is as follows.
+
+  ```
+  table_name|{"category":"a9jadhx","author":"test","price":895}
+  ```
+
+
+Assuming we need to import data into two tables, `user_address` and `user_info`, the message format is as follows.
+
+eg: user_address data format
+
+```
+    user_address|{"user_id":128787321878,"address":"朝阳区朝阳大厦XXX号","timestamp":1589191587}
+ ```
+eg: user_info data format
+```
+    user_info|{"user_id":128787321878,"name":"张三","age":18,"timestamp":1589191587}
 ```
 
-The second one is a json array, which can contain multiple records
-
-```json
-[
-    {   
-        "category":"11",
-        "author":"4avc",
-        "price":895,
-        "timestamp":1589191587
-    },
-    {
-        "category":"22",
-        "author":"2avc",
-        "price":895,
-        "timestamp":1589191487
-    },
-    {
-        "category":"33",
-        "author":"3avc",
-        "price":342,
-        "timestamp":1589191387
-    }
-]
+- The second type is a JSON array that can contain multiple records.
+
+When using **single table import** (that is, specifying the table name through ON TABLE_NAME), the json data format is as follows.
+
+   ```json
+   [
+       {   
+           "category":"11",
+           "author":"4avc",
+           "price":895,
+           "timestamp":1589191587
+       },
+       {
+           "category":"22",
+           "author":"2avc",
+           "price":895,
+           "timestamp":1589191487
+       },
+       {
+           "category":"33",
+           "author":"3avc",
+           "price":342,
+           "timestamp":1589191387
+       }
+   ]
+   ```
+When using **dynamic/multi-table import**  (i.e. not specifying a specific table name), the JSON data format is as follows.
+
+```
+   table_name|[
+       {
+           "user_id":128787321878,
+           "address":"Los Angeles, CA, USA",
+           "timestamp":1589191587
+       },
+       {
+           "user_id":128787321878,
+           "address":"Los Angeles, CA, USA",
+           "timestamp":1589191587
+       },
+       {
+           "user_id":128787321878,
+           "address":"Los Angeles, CA, USA",
+           "timestamp":1589191587
+       }
+   ]
+```
+Similarly, taking the tables `user_address` and `user_info` as examples, the message format would be as follows.
+
+eg: user_address data format
 ```
+     user_address|[
+       {   
+           "category":"11",
+           "author":"4avc",
+           "price":895,
+           "timestamp":1589191587
+       },
+       {
+           "category":"22",
+           "author":"2avc",
+           "price":895,
+           "timestamp":1589191487
+       },
+       {
+           "category":"33",
+           "author":"3avc",
+           "price":342,
+           "timestamp":1589191387
+       }
+     ]
+```
+eg: user_info data format
+```
+        user_info|[
+         {
+             "user_id":128787321878,
+             "address":"Los Angeles, CA, USA",
+             "timestamp":1589191587
+         },
+         {
+             "user_id":128787321878,
+             "address":"Los Angeles, CA, USA",
+             "timestamp":1589191587
+         },
+         {
+             "user_id":128787321878,
+             "address":"Los Angeles, CA, USA",
+             "timestamp":1589191587
+         }
+   ```
 
 Create the Doris data table to be imported
 
diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
index ca90b37e8f..a0ef3186c8 100644
--- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
+++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
@@ -34,12 +34,12 @@ CREATE ROUTINE LOAD
 
 The Routine Load function allows users to submit a resident import task, and import data into Doris by continuously reading data from a specified data source.
 
-Currently, only data in CSV or Json format can be imported from Kakfa through unauthenticated or SSL authentication.
+Currently, only data in CSV or Json format can be imported from Kakfa through unauthenticated or SSL authentication. [Example of importing data in Json format](../../../../data-operate/import/import-way/routine-load-manual.md#Example_of_importing_data_in_Json_format)
 
 grammar:
 
 ```sql
-CREATE ROUTINE LOAD [db.]job_name ON tbl_name
+CREATE ROUTINE LOAD [db.]job_name [ON tbl_name]
 [merge_type]
 [load_properties]
 [job_properties]
@@ -53,12 +53,23 @@ FROM data_source [data_source_properties]
 
 - `tbl_name`
 
-  Specifies the name of the table to be imported.
+  Specifies the name of the table to be imported.Optional parameter, If not specified, the dynamic table method will 
+  be used, which requires the data in Kafka to contain table name information. Currently, only the table name can be 
+  obtained from the Kafka value, and it needs to conform to the format of "table_name|{"col1": "val1", "col2": "val2"}" 
+  for JSON data. The "tbl_name" represents the table name, and "|" is used as the delimiter between the table name and 
+  the table data. The same format applies to CSV data, such as "table_name|val1,val2,val3". It is important to note that 
+  the "table_name" must be consistent with the table name in Doris, otherwise it may cause import failures.
+
+  Tips: The `columns_mapping` parameter is not supported for dynamic tables. If your table structure is consistent with 
+  the table structure in Doris and there is a large amount of table information to be imported, this method will be the 
+  best choice.
 
 - `merge_type`
 
   Data merge type. The default is APPEND, which means that the imported data are ordinary append write operations. The MERGE and DELETE types are only available for Unique Key model tables. The MERGE type needs to be used with the [DELETE ON] statement to mark the Delete Flag column. The DELETE type means that all imported data are deleted data.
 
+  Tips: When using dynamic multiple tables, please note that this parameter should be consistent with the type of each dynamic table, otherwise it will result in import failure.
+
 - load_properties
 
   Used to describe imported data. The composition is as follows:
@@ -85,21 +96,29 @@ FROM data_source [data_source_properties]
 
     `(k1, k2, tmpk1, k3 = tmpk1 + 1)`
 
+    Tips: Dynamic multiple tables are not supported.
+
   - `preceding_filter`
 
     Filter raw data. For a detailed introduction to this part, you can refer to the [Column Mapping, Transformation and Filtering] document.
 
+    Tips: When using dynamic multiple tables, please note that this parameter should be consistent with the type of each dynamic table, otherwise it will result in import failure.
+
   - `where_predicates`
 
     Filter imported data based on conditions. For a detailed introduction to this part, you can refer to the [Column Mapping, Transformation and Filtering] document.
 
     `WHERE k1 > 100 and k2 = 1000`
+  
+    Tips: When using dynamic multiple tables, please note that this parameter should be consistent with the type of each dynamic table, otherwise it will result in import failure.
 
   - `partitions`
 
     Specify in which partitions of the import destination table. If not specified, it will be automatically imported into the corresponding partition.
 
     `PARTITION(p1, p2, p3)`
+    
+    Tips: When using dynamic multiple tables, please note that this parameter should conform to each dynamic table, otherwise it may cause import failure.
 
   - `DELETE ON`
 
@@ -107,9 +126,13 @@ FROM data_source [data_source_properties]
 
     `DELETE ON v3 >100`
 
+    Tips: When using dynamic multiple tables, please note that this parameter should conform to each dynamic table, otherwise it may cause import failure.
+
   - `ORDER BY`
 
     Tables only for the Unique Key model. Used to specify the column in the imported data that represents the Sequence Col. Mainly used to ensure data order when importing.
+  
+    Tips: When using dynamic multiple tables, please note that this parameter should conform to each dynamic table, otherwise it may cause import failure.
 
 - `job_properties`
 
@@ -356,7 +379,31 @@ FROM data_source [data_source_properties]
    );
    ````
 
-2. Create a Kafka routine import task named test1 for example_tbl of example_db. Import tasks are in strict mode.
+2. Create a Kafka routine dynamic multiple tables import task named "test1" for the "example_db". Specify the column delimiter, group.id, and client.id, and automatically consume all partitions, subscribing from the position with data (OFFSET_BEGINNING).
+
+Assuming that we need to import data from Kafka into tables "test1" and "test2" in the "example_db", we create a routine import task named "test1". At the same time, we write the data in "test1" and "test2" to a Kafka topic named "my_topic" so that data from Kafka can be imported into both tables through a routine import task.
+
+   ```sql
+   CREATE ROUTINE LOAD example_db.test1
+   PROPERTIES
+   (
+       "desired_concurrent_number"="3",
+       "max_batch_interval" = "20",
+       "max_batch_rows" = "300000",
+       "max_batch_size" = "209715200",
+       "strict_mode" = "false"
+   )
+   FROM KAFKA
+   (
+       "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
+       "kafka_topic" = "my_topic",
+       "property.group.id" = "xxx",
+       "property.client.id" = "xxx",
+       "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+   );
+   ```
+
+3. Create a Kafka routine import task named test1 for example_tbl of example_db. Import tasks are in strict mode.
 
    
 
@@ -382,7 +429,7 @@ FROM data_source [data_source_properties]
    );
    ````
 
-3. Import data from the Kafka cluster through SSL authentication. Also set the client.id parameter. The import task is in non-strict mode and the time zone is Africa/Abidjan
+4. Import data from the Kafka cluster through SSL authentication. Also set the client.id parameter. The import task is in non-strict mode and the time zone is Africa/Abidjan
 
    
 
@@ -412,7 +459,7 @@ FROM data_source [data_source_properties]
    );
    ````
 
-4. Import data in Json format. By default, the field name in Json is used as the column name mapping. Specify to import three partitions 0, 1, and 2, and the starting offsets are all 0
+5. Import data in Json format. By default, the field name in Json is used as the column name mapping. Specify to import three partitions 0, 1, and 2, and the starting offsets are all 0
 
    
 
@@ -437,7 +484,7 @@ FROM data_source [data_source_properties]
    );
    ````
 
-5. Import Json data, extract fields through Jsonpaths, and specify the root node of the Json document
+6. Import Json data, extract fields through Jsonpaths, and specify the root node of the Json document
 
    
 
@@ -465,7 +512,7 @@ FROM data_source [data_source_properties]
    );
    ````
 
-6. Create a Kafka routine import task named test1 for example_tbl of example_db. And use conditional filtering.
+7. Create a Kafka routine import task named test1 for example_tbl of example_db. And use conditional filtering.
 
    
 
@@ -492,7 +539,7 @@ FROM data_source [data_source_properties]
    );
    ````
 
-7. Import data to Unique with sequence column Key model table
+8. Import data to Unique with sequence column Key model table
 
    
 
@@ -516,7 +563,7 @@ FROM data_source [data_source_properties]
    );
    ````
 
-8. Consume from a specified point in time
+9. Consume from a specified point in time
 
    
 
diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD.md
index 9d3ae387e6..0c47239863 100644
--- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD.md
+++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD.md
@@ -49,7 +49,8 @@ Result description:
            PauseTime: The last job pause time
              EndTime: Job end time
               DbName: corresponding database name
-           TableName: corresponding table name
+           TableName: The name of the corresponding table (In the case of multiple tables, since it is a dynamic table, the specific table name is not displayed, and we uniformly display it as "multi-table").
+           IsMultiTbl: Indicates whether it is a multi-table
                State: job running state
       DataSourceType: Data source type: KAFKA
       CurrentTaskNum: The current number of subtasks
diff --git a/docs/zh-CN/docs/data-operate/import/import-way/routine-load-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/routine-load-manual.md
index 71320d8993..e71523a1bb 100644
--- a/docs/zh-CN/docs/data-operate/import/import-way/routine-load-manual.md
+++ b/docs/zh-CN/docs/data-operate/import/import-way/routine-load-manual.md
@@ -132,17 +132,36 @@ CREATE ROUTINE LOAD example_db.test1 ON example_tbl
 
 ```
 
-3. 导入Json格式数据使用示例
+[3. 导入Json格式数据使用示例](#导入Json格式数据使用示例)
 
    Routine Load导入的json格式仅支持以下两种
 
    第一种只有一条记录,且为json对象:
-
+   当使用**单表导入**(即通过 ON TABLE_NAME 指定 表名)时,json 数据格式如下
    ```json
    {"category":"a9jadhx","author":"test","price":895}
    ```
 
-   第二种为json数组,数组中可含多条记录
+当使用**动态/多表导入** Routine Load (即不指定具体的表名)时,json 数据格式如下
+
+  ```
+  table_name|{"category":"a9jadhx","author":"test","price":895}
+  ```
+假设我们需要导入数据到 user_address 以及 user_info 两张表,那么消息格式如下
+
+eg: user_address 表的 json 数据
+    
+```
+    user_address|{"user_id":128787321878,"address":"朝阳区朝阳大厦XXX号","timestamp":1589191587}
+ ```
+eg: user_info 表的 json 数据
+```
+    user_info|{"user_id":128787321878,"name":"张三","age":18,"timestamp":1589191587}
+```
+
+第二种为json数组,数组中可含多条记录
+
+当使用**单表导入**(即通过 ON TABLE_NAME 指定 表名)时,json 数据格式如下
 
    ```json
    [
@@ -166,6 +185,70 @@ CREATE ROUTINE LOAD example_db.test1 ON example_tbl
        }
    ]
    ```
+当使用**动态/多表导入**(即不指定具体的表名)时,json 数据格式如下
+```
+   table_name|[
+       {
+           "user_id":128787321878,
+           "address":"朝阳区朝阳大厦XXX号",
+           "timestamp":1589191587
+       },
+       {
+           "user_id":128787321878,
+           "address":"朝阳区朝阳大厦XXX号",
+           "timestamp":1589191587
+       },
+       {
+           "user_id":128787321878,
+           "address":"朝阳区朝阳大厦XXX号",
+           "timestamp":1589191587
+       }
+   ]
+```
+同样我们以 `user_address` 以及 `user_info` 两张表为例,那么消息格式如下
+    
+eg: user_address 表的 json 数据
+```
+     user_address|[
+       {   
+           "category":"11",
+           "author":"4avc",
+           "price":895,
+           "timestamp":1589191587
+       },
+       {
+           "category":"22",
+           "author":"2avc",
+           "price":895,
+           "timestamp":1589191487
+       },
+       {
+           "category":"33",
+           "author":"3avc",
+           "price":342,
+           "timestamp":1589191387
+       }
+     ]
+```
+eg: user_info 表的 json 数据
+```
+        user_info|[
+         {
+             "user_id":128787321878,
+             "address":"朝阳区朝阳大厦XXX号",
+             "timestamp":1589191587
+         },
+         {
+             "user_id":128787321878,
+             "address":"朝阳区朝阳大厦XXX号",
+             "timestamp":1589191587
+         },
+         {
+             "user_id":128787321878,
+             "address":"朝阳区朝阳大厦XXX号",
+             "timestamp":1589191587
+         }
+   ```
    
    创建待导入的Doris数据表
    
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
index 1992f8a478..cf192fa153 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
@@ -35,12 +35,12 @@ CREATE ROUTINE LOAD
 
 例行导入(Routine Load)功能,支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中。
 
-目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入 CSV 或 Json 格式的数据。
+目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入 CSV 或 Json 格式的数据。 [导入Json格式数据使用示例](../../../../data-operate/import/import-way/routine-load-manual.md#导入Json格式数据使用示例)
 
 语法:
 
 ```sql
-CREATE ROUTINE LOAD [db.]job_name ON tbl_name
+CREATE ROUTINE LOAD [db.]job_name [ON tbl_name]
 [merge_type]
 [load_properties]
 [job_properties]
@@ -53,13 +53,19 @@ FROM data_source [data_source_properties]
 
   导入作业的名称,在同一个 database 内,相同名称只能有一个 job 在运行。
 
-- `tbl_name`
+- `tbl_name` 
 
-  指定需要导入的表的名称。
+  指定需要导入的表的名称,可选参数,如果不指定,则采用动态表的方式,这个时候需要 Kafka 中的数据包含表名的信息。
+  目前仅支持从 Kafka 的 Value 中获取表名,且需要符合这种格式:以 json 为例:`table_name|{"col1": "val1", "col2": "val2"}`, 
+  其中 `tbl_name` 为表名,以 `|` 作为表名和表数据的分隔符。csv 格式的数据也是类似的,如:`table_name|val1,val2,val3`。注意,这里的 
+  `table_name` 必须和 Doris 中的表名一致,否则会导致导入失败.
+  
+   tips: 动态表不支持 `columns_mapping` 参数。如果你的表结构和 Doris 中的表结构一致,且存在大量的表信息需要导入,那么这种方式将是不二选择。
 
 - `merge_type`
 
   数据合并类型。默认为 APPEND,表示导入的数据都是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 模型表。其中 MERGE 类型需要配合 [DELETE ON] 语句使用,以标注 Delete Flag 列。而 DELETE 类型则表示导入的所有数据皆为删除数据。
+  tips: 当使用动态多表的时候,请注意此参数应该符合每张动态表的类型,否则会导致导入失败。
 
 - load_properties
 
@@ -87,21 +93,29 @@ FROM data_source [data_source_properties]
 
     `(k1, k2, tmpk1, k3 = tmpk1 + 1)`
 
+    tips: 动态表不支持此参数。
+
   - `preceding_filter`
 
     过滤原始数据。关于这部分详细介绍,可以参阅 [列的映射,转换与过滤] 文档。
+  
+    tips: 当使用动态多表的时候,请注意此参数应该符合每张动态表的列,否则会导致导入失败。通常在使用动态多表的时候,我们仅建议通用公共列使用此参数。    
 
   - `where_predicates`
 
     根据条件对导入的数据进行过滤。关于这部分详细介绍,可以参阅 [列的映射,转换与过滤] 文档。
 
     `WHERE k1 > 100 and k2 = 1000`
+ 
+     tips: 当使用动态多表的时候,请注意此参数应该符合每张动态表的列,否则会导致导入失败。通常在使用动态多表的时候,我们仅建议通用公共列使用此参数。  
 
   - `partitions`
 
     指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。
 
     `PARTITION(p1, p2, p3)`
+  
+     tips: 当使用动态多表的时候,请注意此参数应该符合每张动态表,否则会导致导入失败。
 
   - `DELETE ON`
 
@@ -109,10 +123,14 @@ FROM data_source [data_source_properties]
 
     `DELETE ON v3 >100`
 
+    tips: 当使用动态多表的时候,请注意此参数应该符合每张动态表,否则会导致导入失败。
+
   - `ORDER BY`
 
     仅针对 Unique Key 模型的表。用于指定导入数据中表示 Sequence Col 的列。主要用于导入时保证数据顺序。
 
+    tips: 当使用动态多表的时候,请注意此参数应该符合每张动态表,否则会导致导入失败。
+
 - `job_properties`
 
   用于指定例行导入作业的通用参数。
@@ -360,7 +378,33 @@ FROM data_source [data_source_properties]
    );
    ```
 
-2. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。导入任务为严格模式。
+2. 为 example_db 创建一个名为 test1 的 Kafka 例行动态多表导入任务。指定列分隔符和 group.id 和 client.id,并且自动默认消费所有分区, 
+   且从有数据的位置(OFFSET_BEGINNING)开始订阅
+
+  我们假设需要将 Kafka 中的数据导入到 example_db 中的 test1 以及 test2 表中,我们创建了一个名为 test1 的例行导入任务,同时将 test1 和 
+  test2 中的数据写到一个名为 `my_topic` 的 Kafka 的 topic 中,这样就可以通过一个例行导入任务将 Kafka 中的数据导入到两个表中。
+
+   ```sql
+   CREATE ROUTINE LOAD example_db.test1
+   PROPERTIES
+   (
+       "desired_concurrent_number"="3",
+       "max_batch_interval" = "20",
+       "max_batch_rows" = "300000",
+       "max_batch_size" = "209715200",
+       "strict_mode" = "false"
+   )
+   FROM KAFKA
+   (
+       "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
+       "kafka_topic" = "my_topic",
+       "property.group.id" = "xxx",
+       "property.client.id" = "xxx",
+       "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+   );
+   ```
+
+3. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。导入任务为严格模式。
 
    
 
@@ -386,7 +430,7 @@ FROM data_source [data_source_properties]
    );
    ```
 
-3. 通过 SSL 认证方式,从 Kafka 集群导入数据。同时设置 client.id 参数。导入任务为非严格模式,时区为 Africa/Abidjan
+4. 通过 SSL 认证方式,从 Kafka 集群导入数据。同时设置 client.id 参数。导入任务为非严格模式,时区为 Africa/Abidjan
 
    
 
@@ -416,7 +460,7 @@ FROM data_source [data_source_properties]
    );
    ```
 
-4. 导入 Json 格式数据。默认使用 Json 中的字段名作为列名映射。指定导入 0,1,2 三个分区,起始 offset 都为 0
+5. 导入 Json 格式数据。默认使用 Json 中的字段名作为列名映射。指定导入 0,1,2 三个分区,起始 offset 都为 0
 
    
 
@@ -441,7 +485,7 @@ FROM data_source [data_source_properties]
    );
    ```
 
-5. 导入 Json 数据,并通过 Jsonpaths 抽取字段,并指定 Json 文档根节点
+6. 导入 Json 数据,并通过 Jsonpaths 抽取字段,并指定 Json 文档根节点
 
    
 
@@ -469,7 +513,7 @@ FROM data_source [data_source_properties]
    );
    ```
 
-6. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。并且使用条件过滤。
+7. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。并且使用条件过滤。
 
    
 
@@ -496,7 +540,7 @@ FROM data_source [data_source_properties]
    );
    ```
 
-7. 导入数据到含有 sequence 列的 Unique Key 模型表中
+8. 导入数据到含有 sequence 列的 Unique Key 模型表中
 
    
 
@@ -520,7 +564,7 @@ FROM data_source [data_source_properties]
    );
    ```
 
-8. 从指定的时间点开始消费
+9. 从指定的时间点开始消费
 
    
 
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD.md
index 08cc4bc2d1..6fc5650f01 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD.md
@@ -49,7 +49,8 @@ SHOW [ALL] ROUTINE LOAD [FOR jobName];
            PauseTime: 最近一次作业暂停时间
              EndTime: 作业结束时间
               DbName: 对应数据库名称
-           TableName: 对应表名称
+           TableName: 对应表名称 (多表的情况下由于是动态表,因此不显示具体表名,我们统一显示 multi-table )
+           IsMultiTbl: 是否为多表
                State: 作业运行状态
       DataSourceType: 数据源类型:KAFKA
       CurrentTaskNum: 当前子任务数量
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index b4dac21cf7..cf95aea327 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -792,6 +792,7 @@ nonterminal Expr sign_chain_expr;
 nonterminal Qualifier opt_set_qualifier;
 nonterminal Operation set_op;
 nonterminal ArrayList<String> opt_common_hints;
+nonterminal String optional_on_ident;
 
 nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type;
 
@@ -2604,7 +2605,7 @@ resource_desc ::=
 
 // Routine load statement
 create_routine_load_stmt ::=
-    KW_CREATE KW_ROUTINE KW_LOAD job_label:jobLabel KW_ON ident:tableName
+    KW_CREATE KW_ROUTINE KW_LOAD job_label:jobLabel optional_on_ident:tableName
     opt_with_merge_type:mergeType
     opt_load_property_list:loadPropertyList
     opt_properties:properties
@@ -2615,7 +2616,9 @@ create_routine_load_stmt ::=
          properties, type, customProperties, mergeType, comment);
     :}
     ;
-
+optional_on_ident ::= KW_ON ident:tableName 
+                    {: RESULT = tableName; :}
+                  | {: RESULT = null; :};
 opt_load_property_list ::=
     {:
         RESULT = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
index edf2800fc9..8c32ae3cb5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
@@ -212,7 +212,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
         RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
                 .getJob(getDbName(), getLabel());
         this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
-                .createDataSource(job.getDataSourceType().name(), dataSourceMapProperties);
+                .createDataSource(job.getDataSourceType().name(), dataSourceMapProperties, job.isMultiTable());
         dataSourceProperties.setAlter(true);
         dataSourceProperties.setTimezone(job.getTimezone());
         dataSourceProperties.analyze();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index 442dfa3a0e..b0e559de15 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -38,6 +38,7 @@ import org.apache.doris.qe.ConnectContext;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -133,7 +134,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
             .build();
 
     private final LabelName labelName;
-    private final String tableName;
+    private String tableName;
     private final List<ParseNode> loadPropertyList;
     private final Map<String, String> jobProperties;
     private final String typeName;
@@ -170,6 +171,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
 
     private LoadTask.MergeType mergeType;
 
+    private boolean isMultiTable = false;
+
     public static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v) -> v > 0L;
     public static final Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> v >= 0L;
     public static final Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> v >= 5 && v <= 60;
@@ -183,12 +186,15 @@ public class CreateRoutineLoadStmt extends DdlStmt {
                                  Map<String, String> dataSourceProperties, LoadTask.MergeType mergeType,
                                  String comment) {
         this.labelName = labelName;
+        if (StringUtils.isBlank(tableName)) {
+            this.isMultiTable = true;
+        }
         this.tableName = tableName;
         this.loadPropertyList = loadPropertyList;
         this.jobProperties = jobProperties == null ? Maps.newHashMap() : jobProperties;
         this.typeName = typeName.toUpperCase();
         this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
-                .createDataSource(typeName, dataSourceProperties);
+                .createDataSource(typeName, dataSourceProperties, this.isMultiTable);
         this.mergeType = mergeType;
         if (comment != null) {
             this.comment = comment;
@@ -316,10 +322,13 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         labelName.analyze(analyzer);
         dbName = labelName.getDbName();
         name = labelName.getLabelName();
+        Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
+        if (isMultiTable) {
+            return;
+        }
         if (Strings.isNullOrEmpty(tableName)) {
             throw new AnalysisException("Table name should not be null");
         }
-        Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
         Table table = db.getTableOrAnalysisException(tableName);
         if (mergeType != LoadTask.MergeType.APPEND
                 && (table.getType() != Table.TableType.OLAP
@@ -352,6 +361,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
                     columnSeparator = (Separator) parseNode;
                     columnSeparator.analyze(null);
                 } else if (parseNode instanceof ImportColumnsStmt) {
+                    if (isMultiTable) {
+                        throw new AnalysisException("Multi-table load does not support setting columns info");
+                    }
                     // check columns info
                     if (importColumnsStmt != null) {
                         throw new AnalysisException("repeat setting of columns info");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java
index 862858a684..b4f7162db7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java
@@ -74,6 +74,7 @@ public class ShowRoutineLoadStmt extends ShowStmt {
                     .add("EndTime")
                     .add("DbName")
                     .add("TableName")
+                    .add("IsMultiTable")
                     .add("State")
                     .add("DataSourceType")
                     .add("CurrentTaskNum")
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/AbstractDataSourceProperties.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/AbstractDataSourceProperties.java
index 7a5ffcd7d3..c7ea586317 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/AbstractDataSourceProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/AbstractDataSourceProperties.java
@@ -57,14 +57,25 @@ public abstract class AbstractDataSourceProperties {
     protected String timezone;
 
 
-    public AbstractDataSourceProperties(Map<String, String> dataSourceProperties) {
+    public AbstractDataSourceProperties(Map<String, String> dataSourceProperties, boolean multiTable) {
         this.originalDataSourceProperties = dataSourceProperties;
+        this.multiTable = multiTable;
+    }
+
+    public AbstractDataSourceProperties(Map<String, String> originalDataSourceProperties) {
+        this.originalDataSourceProperties = originalDataSourceProperties;
     }
 
     protected abstract String getDataSourceType();
 
     protected abstract List<String> getRequiredProperties() throws UserException;
 
+    /**
+     * Whether the data source is multi load
+     * default is false
+     */
+    protected boolean multiTable = false;
+
     /**
      * Check required properties
      * we can check for optional mutex parameters, and whether the concrete type is null in the future
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 9409218cfd..0d6cb534cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -115,6 +115,16 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         this.progress = new KafkaProgress();
     }
 
+    public KafkaRoutineLoadJob(Long id, String name, String clusterName,
+                               long dbId, String brokerList, String topic,
+                               UserIdentity userIdentity, boolean isMultiTable) {
+        super(id, name, clusterName, dbId, LoadDataSourceType.KAFKA, userIdentity);
+        this.brokerList = brokerList;
+        this.topic = topic;
+        this.progress = new KafkaProgress();
+        setMultiTable(isMultiTable);
+    }
+
     public String getTopic() {
         return topic;
     }
@@ -216,7 +226,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                                 ((KafkaProgress) progress).getOffsetByPartition(kafkaPartition));
                     }
                     KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), id, clusterName,
-                            maxBatchIntervalS * 2 * 1000, taskKafkaProgress);
+                            maxBatchIntervalS * 2 * 1000, taskKafkaProgress, isMultiTable());
                     routineLoadTaskInfoList.add(kafkaTaskInfo);
                     result.add(kafkaTaskInfo);
                 }
@@ -285,7 +295,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         KafkaTaskInfo oldKafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo;
         // add new task
         KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo,
-                ((KafkaProgress) progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions()));
+                ((KafkaProgress) progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions()), isMultiTable());
         // remove old task
         routineLoadTaskInfoList.remove(routineLoadTaskInfo);
         // add new task
@@ -395,16 +405,23 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) throws UserException {
         // check db and table
         Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDBName());
-        OlapTable olapTable = db.getOlapTableOrDdlException(stmt.getTableName());
-        checkMeta(olapTable, stmt.getRoutineLoadDesc());
-        long tableId = olapTable.getId();
 
-        // init kafka routine load job
         long id = Env.getCurrentEnv().getNextId();
         KafkaDataSourceProperties kafkaProperties = (KafkaDataSourceProperties) stmt.getDataSourceProperties();
-        KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
-                db.getClusterName(), db.getId(), tableId,
-                kafkaProperties.getBrokerList(), kafkaProperties.getTopic(), stmt.getUserInfo());
+        KafkaRoutineLoadJob kafkaRoutineLoadJob;
+        if (kafkaProperties.isMultiTable()) {
+            kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
+                    db.getClusterName(), db.getId(),
+                    kafkaProperties.getBrokerList(), kafkaProperties.getTopic(), stmt.getUserInfo(), true);
+        } else {
+            OlapTable olapTable = db.getOlapTableOrDdlException(stmt.getTableName());
+            checkMeta(olapTable, stmt.getRoutineLoadDesc());
+            long tableId = olapTable.getId();
+            // init kafka routine load job
+            kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
+                    db.getClusterName(), db.getId(), tableId,
+                    kafkaProperties.getBrokerList(), kafkaProperties.getTopic(), stmt.getUserInfo());
+        }
         kafkaRoutineLoadJob.setOptional(stmt);
         kafkaRoutineLoadJob.checkCustomProperties();
         kafkaRoutineLoadJob.checkCustomPartition();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index f11d2ad373..18029eba59 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -47,14 +47,14 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
     private Map<Integer, Long> partitionIdToOffset;
 
     public KafkaTaskInfo(UUID id, long jobId, String clusterName,
-            long timeoutMs, Map<Integer, Long> partitionIdToOffset) {
-        super(id, jobId, clusterName, timeoutMs);
+                         long timeoutMs, Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
+        super(id, jobId, clusterName, timeoutMs, isMultiTable);
         this.partitionIdToOffset = partitionIdToOffset;
     }
 
-    public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long> partitionIdToOffset) {
+    public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long> partitionIdToOffset, boolean isMultiTable) {
         super(UUID.randomUUID(), kafkaTaskInfo.getJobId(), kafkaTaskInfo.getClusterName(),
-                kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId());
+                kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(), isMultiTable);
         this.partitionIdToOffset = partitionIdToOffset;
     }
 
@@ -74,9 +74,8 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         tRoutineLoadTask.setTxnId(txnId);
         Database database =
                 Env.getCurrentInternalCatalog().getDbOrMetaException(routineLoadJob.getDbId());
-        Table tbl = database.getTableOrMetaException(routineLoadJob.getTableId());
+
         tRoutineLoadTask.setDb(database.getFullName());
-        tRoutineLoadTask.setTbl(tbl.getName());
         // label = job_name+job_id+task_id+txn_id
         String label = Joiner.on("-").join(routineLoadJob.getName(),
                 routineLoadJob.getId(), DebugUtil.printId(id), txnId);
@@ -89,10 +88,15 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         tKafkaLoadInfo.setProperties(routineLoadJob.getConvertedCustomProperties());
         tRoutineLoadTask.setKafkaLoadInfo(tKafkaLoadInfo);
         tRoutineLoadTask.setType(TLoadSourceType.KAFKA);
-        if (Config.enable_pipeline_load) {
-            tRoutineLoadTask.setPipelineParams(rePlanForPipeline(routineLoadJob));
-        } else {
-            tRoutineLoadTask.setParams(rePlan(routineLoadJob));
+        tRoutineLoadTask.setIsMultiTable(isMultiTable);
+        if (!isMultiTable) {
+            Table tbl = database.getTableOrMetaException(routineLoadJob.getTableId());
+            tRoutineLoadTask.setTbl(tbl.getName());
+            if (Config.enable_pipeline_load) {
+                tRoutineLoadTask.setPipelineParams(rePlanForPipeline(routineLoadJob));
+            } else {
+                tRoutineLoadTask.setParams(rePlan(routineLoadJob));
+            }
         }
         tRoutineLoadTask.setMaxIntervalS(routineLoadJob.getMaxBatchIntervalS());
         tRoutineLoadTask.setMaxBatchRows(routineLoadJob.getMaxBatchRows());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadDataSourcePropertyFactory.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadDataSourcePropertyFactory.java
index 10f20a0012..bb371ba4ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadDataSourcePropertyFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadDataSourcePropertyFactory.java
@@ -25,14 +25,22 @@ import java.util.Map;
  * RoutineLoadDataSourcePropertyFactory is used to create data source properties
  * for routine load job.
  * <p>
- *     Currently, we only support kafka data source.
- *     If we want to support more data source, we can add more data source properties here.
- *     And we can add more data source type in LoadDataSourceType.
- *     Then we can use this factory to create data source properties.
- *</p>
+ * Currently, we only support kafka data source.
+ * If we want to support more data source, we can add more data source properties here.
+ * And we can add more data source type in LoadDataSourceType.
+ * Then we can use this factory to create data source properties.
+ * </p>
  */
 public class RoutineLoadDataSourcePropertyFactory {
 
+    public static AbstractDataSourceProperties createDataSource(String type, Map<String, String> parameters,
+                                                                boolean multiLoad) {
+        if (type.equals(LoadDataSourceType.KAFKA.name())) {
+            return new KafkaDataSourceProperties(parameters, multiLoad);
+        }
+        throw new IllegalArgumentException("Unknown routine load data source type: " + type);
+    }
+
     public static AbstractDataSourceProperties createDataSource(String type, Map<String, String> parameters) {
         if (type.equals(LoadDataSourceType.KAFKA.name())) {
             return new KafkaDataSourceProperties(parameters);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 56186cbda2..b0ae15b3f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -77,6 +77,7 @@ import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import lombok.Getter;
+import lombok.Setter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -114,6 +115,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
 
     protected static final String STAR_STRING = "*";
 
+    @Getter
+    @Setter
+    private boolean isMultiTable = false;
+
     /*
                      +-----------------+
     fe schedule job  |  NEED_SCHEDULE  |  user resume job
@@ -196,8 +201,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     /**
      * RoutineLoad support json data.
      * Require Params:
-     *   1) format = "json"
-     *   2) jsonPath = "$.XXX.xxx"
+     * 1) format = "json"
+     * 2) jsonPath = "$.XXX.xxx"
      */
     private static final String PROPS_FORMAT = "format";
     private static final String PROPS_STRIP_OUTER_ARRAY = "strip_outer_array";
@@ -276,6 +281,29 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         }
     }
 
+    /**
+     * MultiLoadJob will use this constructor
+     */
+    public RoutineLoadJob(Long id, String name, String clusterName,
+                          long dbId, LoadDataSourceType dataSourceType,
+                          UserIdentity userIdentity) {
+        this(id, dataSourceType);
+        this.name = name;
+        this.clusterName = clusterName;
+        this.dbId = dbId;
+        this.authCode = 0;
+        this.userIdentity = userIdentity;
+        this.isMultiTable = true;
+
+        if (ConnectContext.get() != null) {
+            SessionVariable var = ConnectContext.get().getSessionVariable();
+            sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode()));
+        } else {
+            sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
+        }
+    }
+
+
     protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException {
         setRoutineLoadDesc(stmt.getRoutineLoadDesc());
         if (stmt.getDesiredConcurrentNum() != -1) {
@@ -420,6 +448,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
 
     public String getTableName() throws MetaNotFoundException {
         Database database = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
+        if (isMultiTable) {
+            return null;
+        }
         return database.getTableOrMetaException(tableId).getName();
     }
 
@@ -724,7 +755,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     }
 
     private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes,
-            long taskExecutionTime, boolean isReplay) throws UserException {
+                                 long taskExecutionTime, boolean isReplay) throws UserException {
         this.jobStatistic.totalRows += numOfTotalRows;
         this.jobStatistic.errorRows += numOfErrorRows;
         this.jobStatistic.unselectedRows += unselectedRows;
@@ -753,7 +784,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
                 if (!isReplay) {
                     // remove all of task in jobs and change job state to paused
                     updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR,
-                                    "current error rows of job is more than max error num"), isReplay);
+                            "current error rows of job is more than max error num"), isReplay);
                 }
             }
 
@@ -779,7 +810,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
             if (!isReplay) {
                 // remove all of task in jobs and change job state to paused
                 updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR,
-                                "current error rows is more than max error num"), isReplay);
+                        "current error rows is more than max error num"), isReplay);
             }
             // reset currentTotalNum and currentErrorNum
             this.jobStatistic.currentErrorRows = 0;
@@ -806,8 +837,12 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
 
     private void initPlanner() throws UserException {
         Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
+        // for multi table load job, the table name is dynamic,we will set table when task scheduling.
+        if (isMultiTable) {
+            return;
+        }
         planner = new StreamLoadPlanner(db,
-            (OlapTable) db.getTableOrMetaException(this.tableId, Table.TableType.OLAP), this);
+                (OlapTable) db.getTableOrMetaException(this.tableId, Table.TableType.OLAP), this);
     }
 
     public TExecPlanFragmentParams plan(TUniqueId loadId, long txnId) throws UserException {
@@ -857,9 +892,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     public void beforeAborted(TransactionState txnState) throws TransactionException {
         if (LOG.isDebugEnabled()) {
             LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, txnState.getLabel())
-                              .add("txn_state", txnState)
-                              .add("msg", "task before aborted")
-                              .build());
+                    .add("txn_state", txnState)
+                    .add("msg", "task before aborted")
+                    .build());
         }
         executeBeforeCheck(txnState, TransactionStatus.ABORTED);
     }
@@ -871,9 +906,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     public void beforeCommitted(TransactionState txnState) throws TransactionException {
         if (LOG.isDebugEnabled()) {
             LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, txnState.getLabel())
-                              .add("txn_state", txnState)
-                              .add("msg", "task before committed")
-                              .build());
+                    .add("txn_state", txnState)
+                    .add("msg", "task before committed")
+                    .build());
         }
         executeBeforeCheck(txnState, TransactionStatus.COMMITTED);
     }
@@ -1038,9 +1073,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
                 // step1: job state will be changed depending on txnStatusChangeReasonString
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, txnState.getLabel())
-                                      .add("txn_id", txnState.getTransactionId())
-                                      .add("msg", "txn abort with reason " + txnStatusChangeReasonString)
-                                      .build());
+                            .add("txn_id", txnState.getTransactionId())
+                            .add("msg", "txn abort with reason " + txnStatusChangeReasonString)
+                            .build());
                 }
                 ++this.jobStatistic.abortedTaskNum;
                 TransactionState.TxnStatusChangeReason txnStatusChangeReason = null;
@@ -1071,7 +1106,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
             String msg = "be " + taskBeId + " abort task " + txnState.getLabel()
                     + " failed with error " + e.getMessage();
             updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
-                        false /* not replay */);
+                    false /* not replay */);
             LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
                     .add("task_id", txnState.getLabel())
                     .add("error_msg", "change job state to paused"
@@ -1095,7 +1130,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
 
     // check task exists or not before call method
     private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState,
-            TransactionStatus txnStatus, TransactionState.TxnStatusChangeReason txnStatusChangeReason)
+                                               TransactionStatus txnStatus,
+                                               TransactionState.TxnStatusChangeReason txnStatusChangeReason)
             throws UserException {
         // step0: get progress from transaction state
         RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment
@@ -1103,11 +1139,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         if (rlTaskTxnCommitAttachment == null) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId())
-                                  .add("job_id", routineLoadTaskInfo.getJobId())
-                                  .add("txn_id", routineLoadTaskInfo.getTxnId())
-                                  .add("msg", "commit task will be ignore when attachment txn of task is null,"
-                                          + " maybe task was aborted by master when timeout")
-                                  .build());
+                        .add("job_id", routineLoadTaskInfo.getJobId())
+                        .add("txn_id", routineLoadTaskInfo.getTxnId())
+                        .add("msg", "commit task will be ignore when attachment txn of task is null,"
+                                + " maybe task was aborted by master when timeout")
+                        .build());
             }
         } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) {
             // step2: update job progress
@@ -1202,10 +1238,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
             Env.getCurrentEnv().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState));
         }
         LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
-                         .add("current_job_state", getState())
-                         .add("msg", "job state has been changed")
-                         .add("is replay", String.valueOf(isReplay))
-                         .build());
+                .add("current_job_state", getState())
+                .add("msg", "job state has been changed")
+                .add("is replay", String.valueOf(isReplay))
+                .build());
     }
 
     private void executeRunning() {
@@ -1244,8 +1280,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         Database database = Env.getCurrentInternalCatalog().getDbNullable(dbId);
         if (database == null) {
             LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
-                             .add("db_id", dbId)
-                             .add("msg", "The database has been deleted. Change job state to cancelled").build());
+                    .add("db_id", dbId)
+                    .add("msg", "The database has been deleted. Change job state to cancelled").build());
             writeLock();
             try {
                 if (!state.isFinalState()) {
@@ -1261,10 +1297,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
 
         // check table belong to database
         Table table = database.getTableNullable(tableId);
-        if (table == null) {
+        if (table == null && !isMultiTable) {
             LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id).add("db_id", dbId)
-                             .add("table_id", tableId)
-                             .add("msg", "The table has been deleted change job state to cancelled").build());
+                    .add("table_id", tableId)
+                    .add("msg", "The table has been deleted change job state to cancelled").build());
             writeLock();
             try {
                 if (!state.isFinalState()) {
@@ -1334,7 +1370,12 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
             row.add(TimeUtils.longToTimeString(pauseTimestamp));
             row.add(TimeUtils.longToTimeString(endTimestamp));
             row.add(database.map(Database::getFullName).orElse(String.valueOf(dbId)));
-            row.add(table.map(Table::getName).orElse(String.valueOf(tableId)));
+            if (isMultiTable) {
+                row.add("");
+            } else {
+                row.add(table.map(Table::getName).orElse(String.valueOf(tableId)));
+            }
+            row.add(Boolean.toString(isMultiTable));
             row.add(getState().name());
             row.add(dataSourceType.name());
             row.add(String.valueOf(getSizeOfRoutineLoadTaskInfoList()));
@@ -1385,7 +1426,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         // 1.job_name
         sb.append("CREATE ROUTINE LOAD ").append(name);
         // 2.tbl_name
-        sb.append(" ON ").append(table.map(Table::getName).orElse(String.valueOf(tableId))).append("\n");
+        if (!isMultiTable) {
+            sb.append(" ON ").append(table.map(Table::getName).orElse(String.valueOf(tableId))).append("\n");
+        }
         // 3.merge_type
         sb.append("WITH ").append(mergeType.name()).append("\n");
         // 4.load_properties
@@ -1486,9 +1529,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     private String getTaskStatistic() {
         Map<String, String> result = Maps.newHashMap();
         result.put("running_task",
-                   String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> entity.isRunning()).count()));
+                String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> entity.isRunning()).count()));
         result.put("waiting_task",
-                   String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> !entity.isRunning()).count()));
+                String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> !entity.isRunning()).count()));
         Gson gson = new GsonBuilder().disableHtmlEscaping().create();
         return gson.toJson(result);
     }
@@ -1612,6 +1655,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         clusterName = Text.readString(in);
         dbId = in.readLong();
         tableId = in.readLong();
+        if (tableId == 0) {
+            isMultiTable = true;
+        }
         desireTaskConcurrentNum = in.readInt();
         state = JobState.valueOf(Text.readString(in));
         maxErrorNum = in.readLong();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index fd6a223e9f..bdfe5de0df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -55,6 +55,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -184,16 +185,10 @@ public class RoutineLoadManager implements Writable {
     private void unprotectedAddJob(RoutineLoadJob routineLoadJob) {
         idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob);
 
-        Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId());
-        if (nameToRoutineLoadJob == null) {
-            nameToRoutineLoadJob = Maps.newConcurrentMap();
-            dbToNameToRoutineLoadJob.put(routineLoadJob.getDbId(), nameToRoutineLoadJob);
-        }
-        List<RoutineLoadJob> routineLoadJobList = nameToRoutineLoadJob.get(routineLoadJob.getName());
-        if (routineLoadJobList == null) {
-            routineLoadJobList = Lists.newArrayList();
-            nameToRoutineLoadJob.put(routineLoadJob.getName(), routineLoadJobList);
-        }
+        Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = dbToNameToRoutineLoadJob
+                .computeIfAbsent(routineLoadJob.getDbId(), k -> Maps.newConcurrentMap());
+        List<RoutineLoadJob> routineLoadJobList = nameToRoutineLoadJob
+                .computeIfAbsent(routineLoadJob.getName(), k -> Lists.newArrayList());
         routineLoadJobList.add(routineLoadJob);
         // add txn state callback in factory
         Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(routineLoadJob);
@@ -229,6 +224,18 @@ public class RoutineLoadManager implements Writable {
         } catch (MetaNotFoundException e) {
             throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e);
         }
+        if (routineLoadJob.isMultiTable()) {
+            if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
+                    dbFullName,
+                    PrivPredicate.LOAD)) {
+                //todo add new error code
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
+                        ConnectContext.get().getQualifiedUser(),
+                        ConnectContext.get().getRemoteIP(),
+                        dbFullName);
+            }
+            return routineLoadJob;
+        }
         if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
                 dbFullName,
                 tableName,
@@ -243,7 +250,7 @@ public class RoutineLoadManager implements Writable {
 
     // get all jobs which state is not in final state from specified database
     public List<RoutineLoadJob> checkPrivAndGetAllJobs(String dbName)
-            throws MetaNotFoundException, DdlException, AnalysisException {
+            throws MetaNotFoundException, DdlException {
 
         List<RoutineLoadJob> result = Lists.newArrayList();
         Database database = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
@@ -258,8 +265,8 @@ public class RoutineLoadManager implements Writable {
             for (RoutineLoadJob job : jobs) {
                 if (!job.getState().isFinalState()) {
                     String tableName = job.getTableName();
-                    if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
-                            dbName, tableName, PrivPredicate.LOAD)) {
+                    if (!job.isMultiTable() && !Env.getCurrentEnv().getAccessManager()
+                            .checkTblPriv(ConnectContext.get(), dbName, tableName, PrivPredicate.LOAD)) {
                         continue;
                     }
                     result.add(job);
@@ -534,7 +541,7 @@ public class RoutineLoadManager implements Writable {
 
     public RoutineLoadJob getJob(String dbFullName, String jobName) throws MetaNotFoundException {
         List<RoutineLoadJob> routineLoadJobList = getJob(dbFullName, jobName, false, null);
-        if (routineLoadJobList == null || routineLoadJobList.size() == 0) {
+        if (CollectionUtils.isEmpty(routineLoadJobList)) {
             return null;
         } else {
             return routineLoadJobList.get(0);
@@ -542,8 +549,8 @@ public class RoutineLoadManager implements Writable {
     }
 
     /*
-      if dbFullName is null, result = all of routine load job in all of db
-      else if jobName is null, result =  all of routine load job in dbFullName
+      if dbFullName is null, result = all routine load job in all db
+      else if jobName is null, result =  all routine load job in dbFullName
 
       if includeHistory is false, filter not running job in result
       else return all of result
@@ -595,7 +602,7 @@ public class RoutineLoadManager implements Writable {
         return result;
     }
 
-    // return all of routine load job named jobName in all of db
+    // return all routine load job named jobName in all of db
     public List<RoutineLoadJob> getJobByName(String jobName) {
         List<RoutineLoadJob> result = Lists.newArrayList();
         for (Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob : dbToNameToRoutineLoadJob.values()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 01ea0ebc2b..77758a72d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -70,20 +70,24 @@ public abstract class RoutineLoadTaskInfo {
 
     protected long timeoutMs = -1;
 
+    protected boolean isMultiTable = false;
+
     // this status will be set when corresponding transaction's status is changed.
     // so that user or other logic can know the status of the corresponding txn.
     protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
 
-    public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs) {
+    public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, boolean isMultiTable) {
         this.id = id;
         this.jobId = jobId;
         this.clusterName = clusterName;
         this.createTimeMs = System.currentTimeMillis();
         this.timeoutMs = timeoutMs;
+        this.isMultiTable = isMultiTable;
     }
 
-    public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, long previousBeId) {
-        this(id, jobId, clusterName, timeoutMs);
+    public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, long previousBeId,
+                               boolean isMultiTable) {
+        this(id, jobId, clusterName, timeoutMs, isMultiTable);
         this.previousBeId = previousBeId;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index cadd1adc88..94f5d4388f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -109,7 +109,6 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
             scheduleOneTask(routineLoadTaskInfo);
         } catch (Exception e) {
             LOG.warn("Taking routine load task from queue has been interrupted", e);
-            return;
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfigType.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfigType.java
new file mode 100644
index 0000000000..a86e83ebe2
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfigType.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload.kafka;
+
+public class KafkaConfigType {
+
+    /**
+     * support in future
+     */
+    enum TableNameLocation {
+        KEY
+    }
+
+    enum TableNameFormat {
+        TEXT
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfiguration.java
index 8fb65fbe0c..959d88bb5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfiguration.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfiguration.java
@@ -37,8 +37,15 @@ public enum KafkaConfiguration {
     KAFKA_OFFSETS("kafka_offsets", null, offsetsString -> Splitter.on(",").trimResults().splitToList(offsetsString)),
 
     KAFKA_DEFAULT_OFFSETS("kafka_default_offsets", "OFFSET_END", offset -> offset),
-    KAFKA_ORIGIN_DEFAULT_OFFSETS("kafka_origin_default_offsets", null, offset -> offset);
-
+    KAFKA_ORIGIN_DEFAULT_OFFSETS("kafka_origin_default_offsets", null, offset -> offset),
+    KAFKA_TABLE_NAME_LOCATION("kafka_table_name_location", "key",
+            value -> value.replace(" ", "")),
+    KAFKA_TABLE_NAME_FORMAT("kafka_table_name_format", "TEXT",
+            value -> value.replace(" ", "")),
+    KAFKA_TEXT_TABLE_NAME_FIELD_INDEX("kafka_text_table_name_field_index", 0, Integer::parseInt),
+
+    KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER("kafka_text_table_name_field_delimiter", ",",
+            value -> value.replace(" ", ""));
     private final String name;
 
     public String getName() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaDataSourceProperties.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaDataSourceProperties.java
index 42e5f3e03f..d39d1de7df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaDataSourceProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaDataSourceProperties.java
@@ -73,15 +73,41 @@ public class KafkaDataSourceProperties extends AbstractDataSourceProperties {
     @SerializedName(value = "topic")
     private String topic;
 
+    /**
+     * The table name properties of kafka data source
+     * <p>
+     * table_name_location: table name location
+     * 1. table name location is in the key of kafka message
+     * 2. table name location is in the value of kafka message
+     * <p>
+     * table_name_format: table name format
+     * 1.json format
+     * 2.txt format
+     * <p>
+     * table_name_regex: table name regex
+     */
+    @Getter
+    @SerializedName(value = "tableNameProperties")
+    private Map<String, String> tableNameProperties;
+
     private static final ImmutableSet<String> CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET =
             new ImmutableSet.Builder<String>().add(KafkaConfiguration.KAFKA_BROKER_LIST.getName())
                     .add(KafkaConfiguration.KAFKA_TOPIC.getName())
                     .add(KafkaConfiguration.KAFKA_PARTITIONS.getName())
                     .add(KafkaConfiguration.KAFKA_OFFSETS.getName())
-                    .add(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName()).build();
+                    .add(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName())
+                    .add(KafkaConfiguration.KAFKA_TABLE_NAME_LOCATION.getName())
+                    .add(KafkaConfiguration.KAFKA_TABLE_NAME_FORMAT.getName())
+                    .add(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER.getName())
+                    .add(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_INDEX.getName())
+                    .build();
 
-    public KafkaDataSourceProperties(Map<String, String> dataSourceProperties) {
-        super(dataSourceProperties);
+    public KafkaDataSourceProperties(Map<String, String> dataSourceProperties, boolean multiLoad) {
+        super(dataSourceProperties, multiLoad);
+    }
+
+    public KafkaDataSourceProperties(Map<String, String> originalDataSourceProperties) {
+        super(originalDataSourceProperties);
     }
 
     @Override
@@ -142,6 +168,9 @@ public class KafkaDataSourceProperties extends AbstractDataSourceProperties {
             throw new AnalysisException("Only one of " + KafkaConfiguration.KAFKA_OFFSETS.getName() + " and "
                     + KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName() + " can be set.");
         }
+        if (multiTable) {
+            checkAndSetMultiLoadProperties();
+        }
         if (isAlter() && CollectionUtils.isNotEmpty(partitions) && CollectionUtils.isEmpty(offsets)
                 && StringUtils.isBlank(defaultOffsetString)) {
             // if this is an alter operation, the partition and (default)offset must be set together.
@@ -159,6 +188,24 @@ public class KafkaDataSourceProperties extends AbstractDataSourceProperties {
 
     }
 
+    private void checkAndSetMultiLoadProperties() throws AnalysisException {
+        String tableNameFormat = KafkaConfiguration.KAFKA_TABLE_NAME_FORMAT.getParameterValue(
+                originalDataSourceProperties.get(KafkaConfiguration.KAFKA_TABLE_NAME_FORMAT.getName()));
+        if (!KafkaConfigType.TableNameFormat.TEXT.name().equalsIgnoreCase(tableNameFormat)) {
+            throw new AnalysisException("Multi load olay supported for table name format TEXT");
+        }
+        String tableNameDelimiter = KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER.getParameterValue(
+                originalDataSourceProperties.get(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER.getName()));
+
+        Integer tableNameIndex = KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_INDEX.getParameterValue(
+                originalDataSourceProperties.get(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_INDEX.getName()));
+        tableNameProperties = new HashMap<>();
+        tableNameProperties.put(KafkaConfiguration.KAFKA_TABLE_NAME_FORMAT.getName(), tableNameFormat);
+        tableNameProperties.put(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER.getName(), tableNameDelimiter);
+        tableNameProperties.put(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_INDEX.getName(),
+                String.valueOf(tableNameIndex));
+    }
+
     private static void setDefaultOffsetForPartition(List<Pair<Integer, Long>> kafkaPartitionOffsets,
                                                      String kafkaDefaultOffsetString, boolean isOffsetsForTimes) {
         if (isOffsetsForTimes) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index ce65cf3564..15089bf40c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -111,8 +111,12 @@ public class StreamLoadPlanner {
         return destTable;
     }
 
-    // create the plan. the plan's query id and load id are same, using the parameter 'loadId'
     public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
+        return this.plan(loadId, 1);
+    }
+
+    // create the plan. the plan's query id and load id are same, using the parameter 'loadId'
+    public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdIndex) throws UserException {
         if (destTable.getKeysType() != KeysType.UNIQUE_KEYS
                 && taskInfo.getMergeType() != LoadTask.MergeType.APPEND) {
             throw new AnalysisException("load by MERGE or DELETE is only supported in unique tables.");
@@ -271,7 +275,7 @@ public class StreamLoadPlanner {
         TPlanFragmentExecParams execParams = new TPlanFragmentExecParams();
         // user load id (streamLoadTask.id) as query id
         execParams.setQueryId(loadId);
-        execParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + 1));
+        execParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + fragmentInstanceIdIndex));
         execParams.per_exch_num_senders = Maps.newHashMap();
         execParams.destinations = Lists.newArrayList();
         Map<Integer, List<TScanRangeParams>> perNodeScanRange = Maps.newHashMap();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 8b508b2cd7..c3662a248d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -1410,6 +1410,18 @@ public class ShowExecutor {
                                     + "The job will be cancelled automatically")
                             .build(), e);
                 }
+                if (routineLoadJob.isMultiTable()) {
+                    if (!Env.getCurrentEnv().getAccessManager()
+                            .checkDbPriv(ConnectContext.get(), dbFullName, PrivPredicate.LOAD)) {
+                        LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("operator",
+                                        "show routine load job").add("user", ConnectContext.get().getQualifiedUser())
+                                .add("remote_ip", ConnectContext.get().getRemoteIP()).add("db_full_name", dbFullName)
+                                .add("table_name", tableName).add("error_msg", "The database access denied"));
+                        continue;
+                    }
+                    rows.add(routineLoadJob.getShowInfo());
+                    continue;
+                }
                 if (!Env.getCurrentEnv().getAccessManager()
                         .checkTblPriv(ConnectContext.get(), dbFullName, tableName, PrivPredicate.LOAD)) {
                     LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("operator",
@@ -1418,7 +1430,6 @@ public class ShowExecutor {
                             .add("table_name", tableName).add("error_msg", "The table access denied"));
                     continue;
                 }
-
                 // get routine load info
                 rows.add(routineLoadJob.getShowInfo());
             }
@@ -1459,6 +1470,17 @@ public class ShowExecutor {
             throw new AnalysisException("The table metadata of job has been changed."
                     + " The job will be cancelled automatically", e);
         }
+        if (routineLoadJob.isMultiTable()) {
+            if (!Env.getCurrentEnv().getAccessManager()
+                    .checkDbPriv(ConnectContext.get(), dbFullName, PrivPredicate.LOAD)) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, "LOAD",
+                        ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
+                        dbFullName);
+            }
+            rows.addAll(routineLoadJob.getTasksShowInfo());
+            resultSet = new ShowResultSet(showRoutineLoadTaskStmt.getMetaData(), rows);
+            return;
+        }
         if (!Env.getCurrentEnv().getAccessManager()
                 .checkTblPriv(ConnectContext.get(), dbFullName, tableName, PrivPredicate.LOAD)) {
             ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 5ffcb71264..eb6886855f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -174,6 +174,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -182,14 +183,17 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.IntSupplier;
+import java.util.stream.Collectors;
 
 // Frontend service used to serve all request for this frontend through
 // thrift protocol
@@ -197,6 +201,9 @@ public class FrontendServiceImpl implements FrontendService.Iface {
     private static final Logger LOG = LogManager.getLogger(FrontendServiceImpl.class);
     private MasterImpl masterImpl;
     private ExecuteEnv exeEnv;
+    // key is txn id,value is index of plan fragment instance, it's used by multi table request plan
+    private ConcurrentHashMap<Long, Integer> multiTableFragmentInstanceIdIndexMap =
+            new ConcurrentHashMap<>(64);
 
     public FrontendServiceImpl(ExecuteEnv exeEnv) {
         masterImpl = new MasterImpl();
@@ -1133,6 +1140,30 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         return result;
     }
 
+    private List<Table> queryLoadCommitTables(TLoadTxnCommitRequest request, Database db) throws UserException {
+        List<String> tbNames;
+        //check has multi table
+        if (CollectionUtils.isNotEmpty(request.getTbls())) {
+            tbNames = request.getTbls();
+
+        } else {
+            tbNames = Collections.singletonList(request.getTbl());
+        }
+        List<Table> tables = new ArrayList<>(tbNames.size());
+        for (String tbl : tbNames) {
+            OlapTable table = (OlapTable) db.getTableOrMetaException(tbl, TableType.OLAP);
+            tables.add(table);
+        }
+        //if it has multi table, use multi table and update multi table running transaction table ids
+        if (CollectionUtils.isNotEmpty(request.getTbls())) {
+            List<Long> multiTableIds = tables.stream().map(Table::getId).collect(Collectors.toList());
+            Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(db.getId())
+                    .updateMultiTableRunningTransactionTableIds(request.getTxnId(), multiTableIds);
+            LOG.debug("txn {} has multi table {}", request.getTxnId(), request.getTbls());
+        }
+        return tables;
+    }
+
     private void loadTxnPreCommitImpl(TLoadTxnCommitRequest request) throws UserException {
         String cluster = request.getCluster();
         if (Strings.isNullOrEmpty(cluster)) {
@@ -1144,8 +1175,17 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         } else if (request.isSetToken()) {
             checkToken(request.getToken());
         } else {
-            checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTbl(),
-                    request.getUserIp(), PrivPredicate.LOAD);
+            // refactoring it
+            if (CollectionUtils.isNotEmpty(request.getTbls())) {
+                for (String tbl : request.getTbls()) {
+                    checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), tbl,
+                            request.getUserIp(), PrivPredicate.LOAD);
+                }
+            } else {
+                checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
+                        request.getTbl(),
+                        request.getUserIp(), PrivPredicate.LOAD);
+            }
         }
 
         // get database
@@ -1166,9 +1206,9 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         }
 
         long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000;
-        OlapTable table = (OlapTable) db.getTableOrMetaException(request.getTbl(), TableType.OLAP);
+        List<Table> tables = queryLoadCommitTables(request, db);
         Env.getCurrentGlobalTransactionMgr()
-                .preCommitTransaction2PC(db, Lists.newArrayList(table), request.getTxnId(),
+                .preCommitTransaction2PC(db, tables, request.getTxnId(),
                         TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
                         TxnCommitAttachment.fromThrift(request.txnCommitAttachment));
     }
@@ -1219,6 +1259,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         DatabaseTransactionMgr dbTransactionMgr = Env.getCurrentGlobalTransactionMgr()
                 .getDatabaseTransactionMgr(database.getId());
         TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId());
+        LOG.debug("txn {} has multi table {}", request.getTxnId(), transactionState.getTableIdList());
         if (transactionState == null) {
             throw new UserException("transaction [" + request.getTxnId() + "] not found");
         }
@@ -1250,6 +1291,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
 
     @Override
     public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws TException {
+        multiTableFragmentInstanceIdIndexMap.remove(request.getTxnId());
         String clientAddr = getClientAddrAsString();
         LOG.debug("receive txn commit request: {}, backend: {}", request, clientAddr);
 
@@ -1287,8 +1329,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         } else if (request.isSetToken()) {
             checkToken(request.getToken());
         } else {
-            checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTbl(),
-                    request.getUserIp(), PrivPredicate.LOAD);
+            if (CollectionUtils.isNotEmpty(request.getTbls())) {
+                checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
+                        request.getTbls(), request.getUserIp(), PrivPredicate.LOAD);
+            } else {
+                checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
+                        request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
+            }
         }
 
         // get database
@@ -1307,11 +1354,10 @@ public class FrontendServiceImpl implements FrontendService.Iface {
             }
             throw new UserException("unknown database, database=" + dbName);
         }
-
         long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000;
-        Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP);
+        List<Table> tables = queryLoadCommitTables(request, db);
         return Env.getCurrentGlobalTransactionMgr()
-                .commitAndPublishTransaction(db, Lists.newArrayList(table), request.getTxnId(),
+                .commitAndPublishTransaction(db, tables, request.getTxnId(),
                         TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
                         TxnCommitAttachment.fromThrift(request.txnCommitAttachment));
     }
@@ -1456,8 +1502,17 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         } else if (request.isSetToken()) {
             checkToken(request.getToken());
         } else {
-            checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTbl(),
-                    request.getUserIp(), PrivPredicate.LOAD);
+            //multi table load
+            if (CollectionUtils.isNotEmpty(request.getTbls())) {
+                for (String tbl : request.getTbls()) {
+                    checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), tbl,
+                            request.getUserIp(), PrivPredicate.LOAD);
+                }
+            } else {
+                checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
+                        request.getTbl(),
+                        request.getUserIp(), PrivPredicate.LOAD);
+            }
         }
         String dbName = ClusterNamespace.getFullName(cluster, request.getDb());
         Database db;
@@ -1601,8 +1656,76 @@ public class FrontendServiceImpl implements FrontendService.Iface {
 
     @Override
     public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequest request) {
-        // placeholder
+        List<OlapTable> olapTables;
+        Database db;
+        String fullDbName;
         TStreamLoadMultiTablePutResult result = new TStreamLoadMultiTablePutResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        List<String> tableNames = request.getTableNames();
+        try {
+            if (CollectionUtils.isEmpty(tableNames)) {
+                throw new MetaNotFoundException("table not found");
+            }
+
+            String cluster = request.getCluster();
+            if (Strings.isNullOrEmpty(cluster)) {
+                cluster = SystemInfoService.DEFAULT_CLUSTER;
+            }
+            fullDbName = ClusterNamespace.getFullName(cluster, request.getDb());
+            db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(fullDbName);
+            if (db == null) {
+                String dbName = fullDbName;
+                if (Strings.isNullOrEmpty(request.getCluster())) {
+                    dbName = request.getDb();
+                }
+                throw new UserException("unknown database, database=" + dbName);
+            }
+            // todo Whether there will be a large amount of data risk
+            List<Table> tables = db.getTablesOrEmpty();
+            if (CollectionUtils.isEmpty(tables)) {
+                throw new MetaNotFoundException("table not found");
+            }
+            olapTables = new ArrayList<>(tableNames.size());
+            Map<String, OlapTable> olapTableMap = tables.stream().map(OlapTable.class::cast)
+                    .collect(Collectors.toMap(OlapTable::getName, olapTable -> olapTable));
+            for (String tableName : tableNames) {
+                if (null == olapTableMap.get(tableName)) {
+                    throw new MetaNotFoundException("table not found, table name is " + tableName);
+                }
+                olapTables.add(olapTableMap.get(tableName));
+            }
+        } catch (UserException exception) {
+            LOG.warn("failed to get stream load plan: {}", exception.getMessage());
+            status = new TStatus(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(exception.getMessage());
+            result.setStatus(status);
+            return result;
+        }
+        long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000;
+        List<TExecPlanFragmentParams> planFragmentParamsList = new ArrayList<>(tableNames.size());
+        List<Long> tableIds = olapTables.stream().map(OlapTable::getId).collect(Collectors.toList());
+        // todo: if is multi table, we need consider the lock time and the timeout
+        try {
+            multiTableFragmentInstanceIdIndexMap.putIfAbsent(request.getTxnId(), 1);
+            for (OlapTable table : olapTables) {
+                int index = multiTableFragmentInstanceIdIndexMap.get(request.getTxnId());
+                TExecPlanFragmentParams planFragmentParams = generatePlanFragmentParams(request, db, fullDbName,
+                        table, timeoutMs, index);
+                planFragmentParamsList.add(planFragmentParams);
+                multiTableFragmentInstanceIdIndexMap.put(request.getTxnId(), ++index);
+            }
+            Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(db.getId())
+                    .putTransactionTableNames(request.getTxnId(),
+                            tableIds);
+            LOG.debug("receive stream load multi table put request result: {}", result);
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
+            return result;
+        }
+        result.setParams(planFragmentParamsList);
         return result;
     }
 
@@ -1624,21 +1747,35 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         }
         long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000;
         Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP);
+        return generatePlanFragmentParams(request, db, fullDbName, (OlapTable) table, timeoutMs);
+    }
+
+    private TExecPlanFragmentParams generatePlanFragmentParams(TStreamLoadPutRequest request, Database db,
+                                                               String fullDbName, OlapTable table,
+                                                               long timeoutMs) throws UserException {
+        return generatePlanFragmentParams(request, db, fullDbName, table, timeoutMs, 0);
+    }
+
+    private TExecPlanFragmentParams generatePlanFragmentParams(TStreamLoadPutRequest request, Database db,
+                                                               String fullDbName, OlapTable table,
+                                                               long timeoutMs, int multiTableFragmentInstanceIdIndex)
+            throws UserException {
         if (!table.tryReadLock(timeoutMs, TimeUnit.MILLISECONDS)) {
             throw new UserException(
                     "get table read lock timeout, database=" + fullDbName + ",table=" + table.getName());
         }
         try {
             StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
-            StreamLoadPlanner planner = new StreamLoadPlanner(db, (OlapTable) table, streamLoadTask);
-            TExecPlanFragmentParams plan = planner.plan(streamLoadTask.getId());
+            StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask);
+            TExecPlanFragmentParams plan = planner.plan(streamLoadTask.getId(), multiTableFragmentInstanceIdIndex);
             // add table indexes to transaction state
             TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
                     .getTransactionState(db.getId(), request.getTxnId());
             if (txnState == null) {
                 throw new UserException("txn does not exist: " + request.getTxnId());
             }
-            txnState.addTableIndexes((OlapTable) table);
+            txnState.addTableIndexes(table);
+            plan.setTableName(table.getName());
             return plan;
         } finally {
             table.readUnlock();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 92c46307e5..5ac67993e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -78,6 +78,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
@@ -106,6 +107,14 @@ public class DatabaseTransactionMgr {
     // transactionId -> running TransactionState
     private final Map<Long, TransactionState> idToRunningTransactionState = Maps.newHashMap();
 
+    /**
+     * the multi table ids that are in transaction, used to check whether a table is in transaction
+     * multi table transaction state
+     * txnId -> tableId list
+     */
+    private final ConcurrentHashMap<Long, List<Long>> multiTableRunningTransactionTableIdMaps =
+            new ConcurrentHashMap<>();
+
     // transactionId -> final status TransactionState
     private final Map<Long, TransactionState> idToFinalStatusTransactionState = Maps.newHashMap();
 
@@ -859,6 +868,7 @@ public class DatabaseTransactionMgr {
         // a blocking function, the returned result would be the existed table list which hold write lock
         Database db = env.getInternalCatalog().getDbOrMetaException(transactionState.getDbId());
         List<Long> tableIdList = transactionState.getTableIdList();
+        LOG.debug("finish transaction {} with tables {}", transactionId, tableIdList);
         List<? extends TableIf> tableList = db.getTablesOnIdOrderIfExist(tableIdList);
         tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
         try {
@@ -1889,8 +1899,34 @@ public class DatabaseTransactionMgr {
         final StackTraceElement[] stackTrace = t.getStackTrace();
         StringBuilder msgBuilder = new StringBuilder();
         for (StackTraceElement e : stackTrace) {
-            msgBuilder.append(e.toString() + "\n");
+            msgBuilder.append(e.toString()).append("\n");
         }
         return msgBuilder.toString();
     }
+
+    public void putTransactionTableNames(long transactionId, List<Long> tableIds) {
+        if (CollectionUtils.isEmpty(tableIds)) {
+            return;
+        }
+        if (multiTableRunningTransactionTableIdMaps.contains(transactionId)) {
+            multiTableRunningTransactionTableIdMaps.get(transactionId).addAll(tableIds);
+            return;
+        }
+        multiTableRunningTransactionTableIdMaps.put(transactionId, tableIds);
+    }
+
+    /**
+     * Update transaction table ids by transaction id.
+     * it's used for multi table transaction.
+     */
+    public void updateMultiTableRunningTransactionTableIds(long transactionId, List<Long> tableIds) {
+        if (CollectionUtils.isEmpty(tableIds)) {
+            return;
+        }
+        //idToRunningTransactionState.get(transactionId).
+        if (null == idToRunningTransactionState.get(transactionId)) {
+            return;
+        }
+        idToRunningTransactionState.get(transactionId).setTableIdList(tableIds);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index b3a0284c17..10ce3542f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -35,6 +35,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -175,6 +177,8 @@ public class TransactionState implements Writable {
     @SerializedName(value = "dbId")
     private long dbId;
     @SerializedName(value = "tableIdList")
+    @Setter
+    @Getter
     private List<Long> tableIdList;
     private int replicaNum = 0;
     @SerializedName(value = "txnId")
@@ -555,16 +559,10 @@ public class TransactionState implements Writable {
     }
 
     public synchronized void addTableIndexes(OlapTable table) {
-        Set<Long> indexIds = loadedTblIndexes.get(table.getId());
-        if (indexIds == null) {
-            indexIds = Sets.newHashSet();
-            loadedTblIndexes.put(table.getId(), indexIds);
-        }
+        Set<Long> indexIds = loadedTblIndexes.computeIfAbsent(table.getId(), k -> Sets.newHashSet());
         // always equal the index ids
         indexIds.clear();
-        for (Long indexId : table.getIndexIdToMeta().keySet()) {
-            indexIds.add(indexId);
-        }
+        indexIds.addAll(table.getIndexIdToMeta().keySet());
     }
 
     public Map<Long, Set<Long>> getLoadedTblIndexes() {
@@ -651,8 +649,8 @@ public class TransactionState implements Writable {
         out.writeLong(callbackId);
         out.writeLong(timeoutMs);
         out.writeInt(tableIdList.size());
-        for (int i = 0; i < tableIdList.size(); i++) {
-            out.writeLong(tableIdList.get(i));
+        for (Long aLong : tableIdList) {
+            out.writeLong(aLong);
         }
     }
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java
index a59314952a..51c5177b84 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java
@@ -205,4 +205,72 @@ public class CreateRoutineLoadStmtTest {
         Assert.assertEquals("+08:00", createRoutineLoadStmt.getTimezone());
     }
 
+    @Test
+    public void testMultiTableAnalyze(@Injectable Analyzer analyzer,
+                                      @Injectable SessionVariable sessionVariable) throws UserException {
+        String jobName = "job1";
+        String dbName = "db1";
+        LabelName labelName = new LabelName(dbName, jobName);
+        String topicName = "topic1";
+        String serverAddress = "127.0.0.1:8080";
+        String kafkaPartitionString = "1,2,3";
+        String timeZone = "8:00";
+        List<String> partitionNameString = Lists.newArrayList();
+        partitionNameString.add("p1");
+        PartitionNames partitionNames = new PartitionNames(false, partitionNameString);
+        Separator columnSeparator = new Separator(",");
+
+        // duplicate load property
+        List<ParseNode> loadPropertyList = new ArrayList<>();
+        loadPropertyList.add(columnSeparator);
+        loadPropertyList.add(partitionNames);
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
+        properties.put(LoadStmt.TIMEZONE, timeZone);
+        String typeName = LoadDataSourceType.KAFKA.name();
+        Map<String, String> customProperties = Maps.newHashMap();
+
+        customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), topicName);
+        customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), serverAddress);
+        customProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), kafkaPartitionString);
+        customProperties.put(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER.getName(), "\t");
+        customProperties.put(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_INDEX.getName(), "2");
+        customProperties.put(KafkaConfiguration.KAFKA_TABLE_NAME_FORMAT.getName(), "TEXT");
+
+        CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, null,
+                loadPropertyList, properties,
+                typeName, customProperties,
+                LoadTask.MergeType.APPEND, "");
+        new MockUp<StatementBase>() {
+            @Mock
+            public void analyze(Analyzer analyzer1) {
+                return;
+            }
+        };
+
+        new Expectations() {
+            {
+                ctx.getSessionVariable();
+                result = sessionVariable;
+                sessionVariable.getSendBatchParallelism();
+                result = 1;
+            }
+        };
+
+        createRoutineLoadStmt.analyze(analyzer);
+
+        Assert.assertNotNull(createRoutineLoadStmt.getRoutineLoadDesc());
+        Assert.assertEquals(columnSeparator, createRoutineLoadStmt.getRoutineLoadDesc().getColumnSeparator());
+        Assert.assertEquals(partitionNames.getPartitionNames(), createRoutineLoadStmt.getRoutineLoadDesc().getPartitionNames().getPartitionNames());
+        Assert.assertEquals(2, createRoutineLoadStmt.getDesiredConcurrentNum());
+        Assert.assertEquals(0, createRoutineLoadStmt.getMaxErrorNum());
+        KafkaDataSourceProperties kafkaDataSourceProperties = (KafkaDataSourceProperties) createRoutineLoadStmt.getDataSourceProperties();
+        Assert.assertEquals(serverAddress, kafkaDataSourceProperties.getBrokerList());
+        Assert.assertEquals(topicName, kafkaDataSourceProperties.getTopic());
+        Assert.assertEquals("+08:00", createRoutineLoadStmt.getTimezone());
+        Assert.assertEquals("\t", kafkaDataSourceProperties.getTableNameProperties().get(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER.getName()));
+        Assert.assertEquals("2", kafkaDataSourceProperties.getTableNameProperties().get(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_INDEX.getName()));
+        Assert.assertEquals("TEXT", kafkaDataSourceProperties.getTableNameProperties().get(KafkaConfiguration.KAFKA_TABLE_NAME_FORMAT.getName()));
+    }
+
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 2c0a1d13d4..57ded401bd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -224,7 +224,7 @@ public class KafkaRoutineLoadJobTest {
         Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap();
         partitionIdsToOffset.put(100, 0L);
         KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster",
-                maxBatchIntervalS * 2 * 1000, partitionIdsToOffset);
+                maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false);
         kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() - maxBatchIntervalS * 2 * 1000 - 1);
         routineLoadTaskInfoList.add(kafkaTaskInfo);
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index ddca6fdf4e..e0fdd92f73 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -70,7 +70,7 @@ public class RoutineLoadTaskSchedulerTest {
 
         Queue<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue();
         KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, "default_cluster", 20000,
-                partitionIdToOffset);
+                partitionIdToOffset, false);
         routineLoadTaskInfoQueue.add(routineLoadTaskInfo1);
 
         Map<Long, RoutineLoadTaskInfo> idToRoutineLoadTask = Maps.newHashMap();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index c95c6a25a7..953ea3d879 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -322,7 +322,7 @@ public class GlobalTransactionMgrTest {
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
         KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "default_cluster", 20000,
-                partitionIdToOffset);
+                partitionIdToOffset, false);
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
         TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null,
@@ -394,7 +394,7 @@ public class GlobalTransactionMgrTest {
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
         KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(UUID.randomUUID(), 1L, "defualt_cluster", 20000,
-                partitionIdToOffset);
+                partitionIdToOffset, false);
         Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
         routineLoadTaskInfoList.add(routineLoadTaskInfo);
         TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org