You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/19 05:09:41 UTC
[incubator-seatunnel] branch dev updated: [Hotfix][Connector-V2][RocketMQ] Fix rocketmq e2e test cases #4610 (#4614)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d16ff4d94 [Hotfix][Connector-V2][RocketMQ] Fix rocketmq e2e test cases #4610 (#4614)
d16ff4d94 is described below
commit d16ff4d94f2a54675a027fc12cd2dfcb7a687170
Author: Xiaojian Sun <su...@163.com>
AuthorDate: Wed Apr 19 13:09:33 2023 +0800
[Hotfix][Connector-V2][RocketMQ] Fix rocketmq e2e test cases #4610 (#4614)
---
.../e2e/connector/rocketmq/RocketMqContainer.java | 1 +
.../e2e/connector/rocketmq/RocketMqIT.java | 100 ++++++++++++---------
2 files changed, 61 insertions(+), 40 deletions(-)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java
index 94c50ae05..9206c14b8 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java
@@ -42,6 +42,7 @@ public class RocketMqContainer extends GenericContainer<RocketMqContainer> {
public RocketMqContainer(DockerImageName image) {
super(image);
withExposedPorts(NAMESRV_PORT, BROKER_PORT, BROKER_PORT - 2);
+ this.withEnv("JAVA_OPT_EXT", "-Xms512m -Xmx512m");
}
@Override
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
index 3b0cba105..f292d1120 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
@@ -185,6 +185,7 @@ public class RocketMqIT extends TestSuiteBase implements TestResource {
ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
Assertions.assertTrue(objectNode.has("c_map"));
Assertions.assertTrue(objectNode.has("c_string"));
+ Assertions.assertEquals(10, data.size());
}
@TestTemplate
@@ -309,51 +310,69 @@ public class RocketMqIT extends TestSuiteBase implements TestResource {
}
}
- @SneakyThrows
private Map<String, String> getRocketMqConsumerData(String topicName) {
Map<String, String> data = new HashMap<>();
- DefaultLitePullConsumer consumer =
- RocketMqAdminUtil.initDefaultLitePullConsumer(newConfiguration(), false);
- consumer.start();
- // assign
- Map<MessageQueue, TopicOffset> queueOffsets =
- RetryUtils.retryWithException(
- () -> {
- return RocketMqAdminUtil.offsetTopics(
- newConfiguration(), Lists.newArrayList(topicName))
- .get(0);
- },
- new RetryUtils.RetryMaterial(
- Constant.OPERATION_RETRY_TIME,
- false,
- exception -> exception instanceof RocketMqConnectorException,
- Constant.OPERATION_RETRY_SLEEP));
- consumer.assign(queueOffsets.keySet());
- // seek to offset
- Map<MessageQueue, Long> currentOffsets =
- RocketMqAdminUtil.currentOffsets(
- newConfiguration(), Lists.newArrayList(topicName), queueOffsets.keySet());
- for (MessageQueue mq : queueOffsets.keySet()) {
- long currentOffset =
- currentOffsets.containsKey(mq)
- ? currentOffsets.get(mq)
- : queueOffsets.get(mq).getMinOffset();
- consumer.seek(mq, currentOffset);
- }
- while (true) {
- List<MessageExt> messages = consumer.poll(5000);
- if (messages.isEmpty()) {
- break;
+ try {
+ DefaultLitePullConsumer consumer =
+ RocketMqAdminUtil.initDefaultLitePullConsumer(newConfiguration(), false);
+ consumer.start();
+ // assign
+ Map<MessageQueue, TopicOffset> queueOffsets =
+ RetryUtils.retryWithException(
+ () -> {
+ return RocketMqAdminUtil.offsetTopics(
+ newConfiguration(), Lists.newArrayList(topicName))
+ .get(0);
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ false,
+ exception -> exception instanceof RocketMqConnectorException,
+ Constant.OPERATION_RETRY_SLEEP));
+ consumer.assign(queueOffsets.keySet());
+ // seek to offset
+ Map<MessageQueue, Long> currentOffsets =
+ RocketMqAdminUtil.currentOffsets(
+ newConfiguration(),
+ Lists.newArrayList(topicName),
+ queueOffsets.keySet());
+ for (MessageQueue mq : queueOffsets.keySet()) {
+ long currentOffset =
+ currentOffsets.containsKey(mq)
+ ? currentOffsets.get(mq)
+ : queueOffsets.get(mq).getMinOffset();
+ consumer.seek(mq, currentOffset);
}
- for (MessageExt message : messages) {
- data.put(message.getKeys(), new String(message.getBody(), StandardCharsets.UTF_8));
+ while (true) {
+ List<MessageExt> messages = consumer.poll(5000);
+ if (messages.isEmpty()) {
+ break;
+ }
+ for (MessageExt message : messages) {
+ data.put(
+ message.getKeys(),
+ new String(message.getBody(), StandardCharsets.UTF_8));
+ consumer.getOffsetStore()
+ .updateConsumeOffsetToBroker(
+ new MessageQueue(
+ message.getTopic(),
+ message.getBrokerName(),
+ message.getQueueId()),
+ message.getQueueOffset(),
+ false);
+ }
+ consumer.commitSync();
}
- consumer.commitSync();
- }
- if (consumer != null) {
- consumer.shutdown();
+ if (consumer != null) {
+ consumer.shutdown();
+ }
+ log.info("Consumer {} data total {}", topicName, data.size());
+ // consumer.commitSync() only submits the offset to the broker, and NameServer scans the
+ // broker to update the offset every 10 seconds
+ Thread.sleep(20 * 1000);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
}
- log.info("Consumer {} data total {}", topicName, data.size());
return data;
}
@@ -362,6 +381,7 @@ public class RocketMqIT extends TestSuiteBase implements TestResource {
.groupId(ROCKETMQ_GROUP)
.aclEnable(false)
.namesrvAddr(rocketMqContainer.getNameSrvAddr())
+ .batchSize(10)
.build();
}