You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/04/18 06:22:34 UTC

[incubator-seatunnel] branch dev updated: [Hotfix][Connector-V2][RocketMQ] Fix rocketmq spark e2e test cases (#4583)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e711f6ef4 [Hotfix][Connector-V2][RocketMQ] Fix rocketmq spark e2e test cases (#4583)
e711f6ef4 is described below

commit e711f6ef4c1615d2ec8604e3d3bbc48412f564c3
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Tue Apr 18 14:22:29 2023 +0800

    [Hotfix][Connector-V2][RocketMQ] Fix rocketmq spark e2e test cases (#4583)
---
 .../rocketmq/common/RocketMqAdminUtil.java         |  6 +-
 .../rocketmq/source/RocketMqConsumerThread.java    | 20 +++--
 .../rocketmq/source/RocketMqSourceReader.java      |  5 +-
 .../source/RocketMqSourceSplitEnumerator.java      |  5 +-
 .../e2e/connector/rocketmq/RocketMqIT.java         |  1 -
 .../resources/rocketmq-source_json_to_console.conf | 97 +++++++++++-----------
 .../resources/rocketmq-source_text_to_console.conf | 93 +++++++++++----------
 .../rocketmq_source_earliest_to_console.conf       | 57 +++++++------
 .../rocketmq_source_group_offset_to_console.conf   | 57 +++++++------
 .../rocketmq_source_latest_to_console.conf         | 56 +++++++------
 ...ocketmq_source_specific_offsets_to_console.conf | 59 ++++++-------
 .../rocketmq_source_timestamp_to_console.conf      | 61 +++++++-------
 12 files changed, 272 insertions(+), 245 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java
index ee831257a..8f2c59dcd 100644
--- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.rocketmq.common;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.shade.com.google.common.collect.Maps;
+
 import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
 
@@ -44,9 +47,6 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.CommandUtil;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
index 0c0786569..bfd34c303 100644
--- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
@@ -50,16 +50,20 @@ public class RocketMqConsumerThread implements Runnable {
 
     @Override
     public void run() {
-        while (!Thread.currentThread().isInterrupted()) {
-            try {
-                Consumer<DefaultLitePullConsumer> task = tasks.poll(1, TimeUnit.SECONDS);
-                if (task != null) {
-                    task.accept(consumer);
+        try {
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    Consumer<DefaultLitePullConsumer> task = tasks.poll(1, TimeUnit.SECONDS);
+                    if (task != null) {
+                        task.accept(consumer);
+                    }
+                } catch (InterruptedException e) {
+                    throw new RocketMqConnectorException(
+                            RocketMqConnectorErrorCode.CONSUME_THREAD_RUN_ERROR, e);
                 }
-            } catch (InterruptedException e) {
-                throw new RocketMqConnectorException(
-                        RocketMqConnectorErrorCode.CONSUME_THREAD_RUN_ERROR, e);
             }
+        } finally {
+            this.consumer.shutdown();
         }
     }
 
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
index c44307d16..42c3788e8 100644
--- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Maps;
+import org.apache.seatunnel.shade.com.google.common.collect.Sets;
+
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
@@ -28,8 +31,6 @@ import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConn
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
index 9a4912cd9..d933fe2e9 100644
--- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Maps;
+import org.apache.seatunnel.shade.com.google.common.collect.Sets;
+
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
@@ -28,8 +31,6 @@ import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageQueue;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
index fed95a41d..3b0cba105 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
@@ -185,7 +185,6 @@ public class RocketMqIT extends TestSuiteBase implements TestResource {
         ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
         Assertions.assertTrue(objectNode.has("c_map"));
         Assertions.assertTrue(objectNode.has("c_string"));
-        Assertions.assertEquals(10, data.size());
     }
 
     @TestTemplate
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
index 40f4f1f2d..5094c5c49 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
@@ -19,15 +19,15 @@
 ######
 
 env {
-    execution.parallelism = 1
-    job.mode = "BATCH"
+  execution.parallelism = 1
+  job.mode = "BATCH"
 
-    # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
-    spark.executor.instances = 1
-    spark.executor.cores = 1
-    spark.executor.memory = "1g"
-    spark.master = local
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
 }
 
 source {
@@ -37,21 +37,21 @@ source {
     result_table_name = "rocketmq_table"
     schema = {
       fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
+        id = bigint
+        c_map = "map<string, smallint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
       }
     }
   }
@@ -62,30 +62,33 @@ transform {
 }
 
 sink {
-  Console {}
+  Console {
+    source_table_name = "rocketmq_table"
+  }
   Assert {
-     rules =
-       {
-         field_rules = [
-             {
-                 field_name = id
-                 field_type = long
-                 field_value = [
-                     {
-                         rule_type = NOT_NULL
-                     },
-                     {
-                         rule_type = MIN
-                         rule_value = 0
-                     },
-                     {
-                         rule_type = MAX
-                         rule_value = 99
-                     }
-                 ]
-             }
-         ]
-       }
+    source_table_name = "rocketmq_table"
+    rules =
+      {
+        field_rules = [
+          {
+            field_name = id
+            field_type = long
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              },
+              {
+                rule_type = MIN
+                rule_value = 0
+              },
+              {
+                rule_type = MAX
+                rule_value = 99
+              }
+            ]
+          }
+        ]
+      }
 
-   }
+  }
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
index d04cda5b4..04d33aa6f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
@@ -19,15 +19,15 @@
 ######
 
 env {
-    execution.parallelism = 1
-    job.mode = "BATCH"
+  execution.parallelism = 1
+  job.mode = "BATCH"
 
-    # You can set spark configuration here
-    spark.app.name = "SeaTunnel"
-    spark.executor.instances = 1
-    spark.executor.cores = 1
-    spark.executor.memory = "1g"
-    spark.master = local
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
 }
 
 source {
@@ -37,21 +37,21 @@ source {
     result_table_name = "rocketmq_table"
     schema = {
       fields {
-           id = bigint
-           c_map = "map<string, smallint>"
-           c_array = "array<tinyint>"
-           c_string = string
-           c_boolean = boolean
-           c_tinyint = tinyint
-           c_smallint = smallint
-           c_int = int
-           c_bigint = bigint
-           c_float = float
-           c_double = double
-           c_decimal = "decimal(2, 1)"
-           c_bytes = bytes
-           c_date = date
-           c_timestamp = timestamp
+        id = bigint
+        c_map = "map<string, smallint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
       }
     }
     format = text
@@ -64,28 +64,31 @@ transform {
 }
 
 sink {
-  Console {}
+  Console {
+    source_table_name = "rocketmq_table"
+  }
   Assert {
-     rules = {
-         field_rules = [
- 		   {
-             field_name = id
-             field_type = long
-             field_value = [
-                 {
-                     rule_type = NOT_NULL
-                 },
-                 {
-                     rule_type = MIN
-                     rule_value = 0
-                 },
-                 {
-                     rule_type = MAX
-                     rule_value = 99
-                 }
-             ]
-           }
-        ]
-     }
+    source_table_name = "rocketmq_table"
+    rules = {
+      field_rules = [
+        {
+          field_name = id
+          field_type = long
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            },
+            {
+              rule_type = MIN
+              rule_value = 0
+            },
+            {
+              rule_type = MAX
+              rule_value = 99
+            }
+          ]
+        }
+      ]
+    }
   }
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
index 07e3ad2bf..f1f376f46 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
@@ -34,36 +34,39 @@ source {
     format = json
     start.mode = "CONSUME_FROM_FIRST_OFFSET"
     schema = {
-       fields {
-            id = bigint
-       }
-     }
+      fields {
+        id = bigint
+      }
+    }
   }
 }
 
 transform {
- }
+}
 
-sink  {
-       Console {}
-       Assert {
-           rules = {
-               field_rules = [
-                  {
-                    field_name = id
-                    field_type = long
-                    field_value = [
-                             {
-                                 rule_type = MIN
-                                 rule_value = 0
-                             },
-                             {
-                                 rule_type = MAX
-                                 rule_value = 99
-                             }
-                     ]
-                  }
-               ]
-           }
-       }
+sink {
+  Console {
+    source_table_name = "rocketmq_table"
+  }
+  Assert {
+    source_table_name = "rocketmq_table"
+    rules = {
+      field_rules = [
+        {
+          field_name = id
+          field_type = long
+          field_value = [
+            {
+              rule_type = MIN
+              rule_value = 0
+            },
+            {
+              rule_type = MAX
+              rule_value = 99
+            }
+          ]
+        }
+      ]
+    }
+  }
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
index ce881c0e9..300207501 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
@@ -33,37 +33,40 @@ source {
     format = json
     start.mode = "CONSUME_FROM_GROUP_OFFSETS"
     schema = {
-       fields {
-            id = bigint
-       }
-     }
+      fields {
+        id = bigint
+      }
+    }
   }
 }
 
 transform {
- }
+}
 
-sink  {
-       Console {}
-       Assert {
-            rules = {
-                  field_rules = [
-        	      	  {
-                         field_name = id
-                         field_type = long
-                         field_value = [
+sink {
+  Console {
+    source_table_name = "rocketmq_table"
+  }
+  Assert {
+    source_table_name = "rocketmq_table"
+    rules = {
+      field_rules = [
+        {
+          field_name = id
+          field_type = long
+          field_value = [
 
-                             {
-                                 rule_type = MIN
-                                 rule_value = 100
-                             },
-                             {
-                                 rule_type = MAX
-                                 rule_value = 149
-                             }
-                         ]
-                      }
-                  ]
-             }
-       }
+            {
+              rule_type = MIN
+              rule_value = 100
+            },
+            {
+              rule_type = MAX
+              rule_value = 149
+            }
+          ]
+        }
+      ]
+    }
+  }
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
index 65eae9194..d50af535b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
@@ -33,36 +33,40 @@ source {
     format = json
     start.mode = "CONSUME_FROM_LAST_OFFSET"
     schema = {
-       fields {
-            id = bigint
-       }
-     }
+      fields {
+        id = bigint
+      }
+    }
   }
 }
 
 transform {
 }
 
-sink  {
-       Console {}
-       Assert {
-           rules = {
-               field_rules = [
-        	       {
-                       field_name = id
-                       field_type = long
-                       field_value = [
-                             {
-                                 rule_type = MIN
-                                 rule_value = 99
-                             },
-                             {
-                                 rule_type = MAX
-                                 rule_value = 99
-                             }
-                        ]
-                   }
-               ]
-           }
-      }
+sink {
+  Console {
+    source_table_name = "rocketmq_table"
+  }
+  Assert {
+    source_table_name = "rocketmq_table"
+
+    rules = {
+      field_rules = [
+        {
+          field_name = id
+          field_type = long
+          field_value = [
+            {
+              rule_type = MIN
+              rule_value = 99
+            },
+            {
+              rule_type = MAX
+              rule_value = 99
+            }
+          ]
+        }
+      ]
+    }
+  }
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
index 7b33f49c4..ee25951b9 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
@@ -34,40 +34,43 @@ source {
     format = json
     start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
     schema = {
-       fields {
-            id = bigint
-       }
-     }
+      fields {
+        id = bigint
+      }
+    }
 
     start.mode.offsets = {
-                test_topic_source-0 = 50
-             }
+      test_topic_source-0 = 50
+    }
   }
 }
 
 transform {
 }
 
-sink  {
-       Console {}
-       Assert {
-             rules = {
-                   field_rules = [
-        	      	   {
-                         field_name = id
-                         field_type = long
-                         field_value = [
-                             {
-                                 rule_type = MIN
-                                 rule_value = 50
-                             },
-                             {
-                                 rule_type = MAX
-                                 rule_value = 99
-                             }
-                         ]
-                      }
-                   ]
-             }
-       }
+sink {
+  Console {
+    source_table_name = "rocketmq_table"
+  }
+  Assert {
+    source_table_name = "rocketmq_table"
+    rules = {
+      field_rules = [
+        {
+          field_name = id
+          field_type = long
+          field_value = [
+            {
+              rule_type = MIN
+              rule_value = 50
+            },
+            {
+              rule_type = MAX
+              rule_value = 99
+            }
+          ]
+        }
+      ]
+    }
+  }
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
index 2f55a8005..1465419c9 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
@@ -34,39 +34,42 @@ source {
     format = json
     start.mode = "CONSUME_FROM_TIMESTAMP"
     schema = {
-       fields {
-            id = bigint
-       }
-     }
-     start.mode.timestamp = 1667179890315
+      fields {
+        id = bigint
+      }
+    }
+    start.mode.timestamp = 1667179890315
   }
 }
 
 transform {
 }
 
-sink  {
-       Console {}
-       Assert {
-            rules =
-                {
-                   field_rules = [
-        	      	   {
-                         field_name = id
-                         field_type = long
-                         field_value = [
+sink {
+  Console {
+    source_table_name = "rocketmq_table"
+  }
+  Assert {
+    source_table_name = "rocketmq_table"
+    rules =
+      {
+        field_rules = [
+          {
+            field_name = id
+            field_type = long
+            field_value = [
 
-                             {
-                                 rule_type = MIN
-                                 rule_value = 0
-                             },
-                             {
-                                 rule_type = MAX
-                                 rule_value = 99
-                             }
-                         ]
-                      }
-                   ]
-                }
-            }
-      }
\ No newline at end of file
+              {
+                rule_type = MIN
+                rule_value = 0
+              },
+              {
+                rule_type = MAX
+                rule_value = 99
+              }
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file