You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/04/18 06:22:34 UTC
[incubator-seatunnel] branch dev updated: [Hotfix][Connector-V2][RocketMQ] Fix rocketmq spark e2e test cases (#4583)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e711f6ef4 [Hotfix][Connector-V2][RocketMQ] Fix rocketmq spark e2e test cases (#4583)
e711f6ef4 is described below
commit e711f6ef4c1615d2ec8604e3d3bbc48412f564c3
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Tue Apr 18 14:22:29 2023 +0800
[Hotfix][Connector-V2][RocketMQ] Fix rocketmq spark e2e test cases (#4583)
---
.../rocketmq/common/RocketMqAdminUtil.java | 6 +-
.../rocketmq/source/RocketMqConsumerThread.java | 20 +++--
.../rocketmq/source/RocketMqSourceReader.java | 5 +-
.../source/RocketMqSourceSplitEnumerator.java | 5 +-
.../e2e/connector/rocketmq/RocketMqIT.java | 1 -
.../resources/rocketmq-source_json_to_console.conf | 97 +++++++++++-----------
.../resources/rocketmq-source_text_to_console.conf | 93 +++++++++++----------
.../rocketmq_source_earliest_to_console.conf | 57 +++++++------
.../rocketmq_source_group_offset_to_console.conf | 57 +++++++------
.../rocketmq_source_latest_to_console.conf | 56 +++++++------
...ocketmq_source_specific_offsets_to_console.conf | 59 ++++++-------
.../rocketmq_source_timestamp_to_console.conf | 61 +++++++-------
12 files changed, 272 insertions(+), 245 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java
index ee831257a..8f2c59dcd 100644
--- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.rocketmq.common;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.shade.com.google.common.collect.Maps;
+
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
@@ -44,9 +47,6 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
index 0c0786569..bfd34c303 100644
--- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
@@ -50,16 +50,20 @@ public class RocketMqConsumerThread implements Runnable {
@Override
public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- Consumer<DefaultLitePullConsumer> task = tasks.poll(1, TimeUnit.SECONDS);
- if (task != null) {
- task.accept(consumer);
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Consumer<DefaultLitePullConsumer> task = tasks.poll(1, TimeUnit.SECONDS);
+ if (task != null) {
+ task.accept(consumer);
+ }
+ } catch (InterruptedException e) {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.CONSUME_THREAD_RUN_ERROR, e);
}
- } catch (InterruptedException e) {
- throw new RocketMqConnectorException(
- RocketMqConnectorErrorCode.CONSUME_THREAD_RUN_ERROR, e);
}
+ } finally {
+ this.consumer.shutdown();
}
}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
index c44307d16..42c3788e8 100644
--- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+import org.apache.seatunnel.shade.com.google.common.collect.Maps;
+import org.apache.seatunnel.shade.com.google.common.collect.Sets;
+
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
@@ -28,8 +31,6 @@ import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConn
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
index 9a4912cd9..d933fe2e9 100644
--- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+import org.apache.seatunnel.shade.com.google.common.collect.Maps;
+import org.apache.seatunnel.shade.com.google.common.collect.Sets;
+
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
@@ -28,8 +31,6 @@ import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
index fed95a41d..3b0cba105 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
@@ -185,7 +185,6 @@ public class RocketMqIT extends TestSuiteBase implements TestResource {
ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
Assertions.assertTrue(objectNode.has("c_map"));
Assertions.assertTrue(objectNode.has("c_string"));
- Assertions.assertEquals(10, data.size());
}
@TestTemplate
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
index 40f4f1f2d..5094c5c49 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
@@ -19,15 +19,15 @@
######
env {
- execution.parallelism = 1
- job.mode = "BATCH"
+ execution.parallelism = 1
+ job.mode = "BATCH"
- # You can set spark configuration here
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 1
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
}
source {
@@ -37,21 +37,21 @@ source {
result_table_name = "rocketmq_table"
schema = {
fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
}
}
}
@@ -62,30 +62,33 @@ transform {
}
sink {
- Console {}
+ Console {
+ source_table_name = "rocketmq_table"
+ }
Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
+ source_table_name = "rocketmq_table"
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
- }
+ }
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
index d04cda5b4..04d33aa6f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
@@ -19,15 +19,15 @@
######
env {
- execution.parallelism = 1
- job.mode = "BATCH"
+ execution.parallelism = 1
+ job.mode = "BATCH"
- # You can set spark configuration here
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 1
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
}
source {
@@ -37,21 +37,21 @@ source {
result_table_name = "rocketmq_table"
schema = {
fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
}
}
format = text
@@ -64,28 +64,31 @@ transform {
}
sink {
- Console {}
+ Console {
+ source_table_name = "rocketmq_table"
+ }
Assert {
- rules = {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
+ source_table_name = "rocketmq_table"
+ rules = {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
}
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
index 07e3ad2bf..f1f376f46 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
@@ -34,36 +34,39 @@ source {
format = json
start.mode = "CONSUME_FROM_FIRST_OFFSET"
schema = {
- fields {
- id = bigint
- }
- }
+ fields {
+ id = bigint
+ }
+ }
}
}
transform {
- }
+}
-sink {
- Console {}
- Assert {
- rules = {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
- }
+sink {
+ Console {
+ source_table_name = "rocketmq_table"
+ }
+ Assert {
+ source_table_name = "rocketmq_table"
+ rules = {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
index ce881c0e9..300207501 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
@@ -33,37 +33,40 @@ source {
format = json
start.mode = "CONSUME_FROM_GROUP_OFFSETS"
schema = {
- fields {
- id = bigint
- }
- }
+ fields {
+ id = bigint
+ }
+ }
}
}
transform {
- }
+}
-sink {
- Console {}
- Assert {
- rules = {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
+sink {
+ Console {
+ source_table_name = "rocketmq_table"
+ }
+ Assert {
+ source_table_name = "rocketmq_table"
+ rules = {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
- {
- rule_type = MIN
- rule_value = 100
- },
- {
- rule_type = MAX
- rule_value = 149
- }
- ]
- }
- ]
- }
- }
+ {
+ rule_type = MIN
+ rule_value = 100
+ },
+ {
+ rule_type = MAX
+ rule_value = 149
+ }
+ ]
+ }
+ ]
+ }
+ }
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
index 65eae9194..d50af535b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
@@ -33,36 +33,40 @@ source {
format = json
start.mode = "CONSUME_FROM_LAST_OFFSET"
schema = {
- fields {
- id = bigint
- }
- }
+ fields {
+ id = bigint
+ }
+ }
}
}
transform {
}
-sink {
- Console {}
- Assert {
- rules = {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = MIN
- rule_value = 99
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
- }
+sink {
+ Console {
+ source_table_name = "rocketmq_table"
+ }
+ Assert {
+ source_table_name = "rocketmq_table"
+
+ rules = {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 99
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
index 7b33f49c4..ee25951b9 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
@@ -34,40 +34,43 @@ source {
format = json
start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
schema = {
- fields {
- id = bigint
- }
- }
+ fields {
+ id = bigint
+ }
+ }
start.mode.offsets = {
- test_topic_source-0 = 50
- }
+ test_topic_source-0 = 50
+ }
}
}
transform {
}
-sink {
- Console {}
- Assert {
- rules = {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = MIN
- rule_value = 50
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
- }
+sink {
+ Console {
+ source_table_name = "rocketmq_table"
+ }
+ Assert {
+ source_table_name = "rocketmq_table"
+ rules = {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 50
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
index 2f55a8005..1465419c9 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
@@ -34,39 +34,42 @@ source {
format = json
start.mode = "CONSUME_FROM_TIMESTAMP"
schema = {
- fields {
- id = bigint
- }
- }
- start.mode.timestamp = 1667179890315
+ fields {
+ id = bigint
+ }
+ }
+ start.mode.timestamp = 1667179890315
}
}
transform {
}
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
+sink {
+ Console {
+ source_table_name = "rocketmq_table"
+ }
+ Assert {
+ source_table_name = "rocketmq_table"
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
- }
- }
\ No newline at end of file
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file