You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/19 05:09:41 UTC

[incubator-seatunnel] branch dev updated: [Hotfix][Connector-V2][RocketMQ] Fix rocketmq e2e test cases #4610 (#4614)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d16ff4d94 [Hotfix][Connector-V2][RocketMQ] Fix rocketmq e2e test cases #4610 (#4614)
d16ff4d94 is described below

commit d16ff4d94f2a54675a027fc12cd2dfcb7a687170
Author: Xiaojian Sun <su...@163.com>
AuthorDate: Wed Apr 19 13:09:33 2023 +0800

    [Hotfix][Connector-V2][RocketMQ] Fix rocketmq e2e test cases #4610 (#4614)
---
 .../e2e/connector/rocketmq/RocketMqContainer.java  |   1 +
 .../e2e/connector/rocketmq/RocketMqIT.java         | 100 ++++++++++++---------
 2 files changed, 61 insertions(+), 40 deletions(-)

diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java
index 94c50ae05..9206c14b8 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java
@@ -42,6 +42,7 @@ public class RocketMqContainer extends GenericContainer<RocketMqContainer> {
     public RocketMqContainer(DockerImageName image) {
         super(image);
         withExposedPorts(NAMESRV_PORT, BROKER_PORT, BROKER_PORT - 2);
+        this.withEnv("JAVA_OPT_EXT", "-Xms512m -Xmx512m");
     }
 
     @Override
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
index 3b0cba105..f292d1120 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
@@ -185,6 +185,7 @@ public class RocketMqIT extends TestSuiteBase implements TestResource {
         ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
         Assertions.assertTrue(objectNode.has("c_map"));
         Assertions.assertTrue(objectNode.has("c_string"));
+        Assertions.assertEquals(10, data.size());
     }
 
     @TestTemplate
@@ -309,51 +310,69 @@ public class RocketMqIT extends TestSuiteBase implements TestResource {
         }
     }
 
-    @SneakyThrows
     private Map<String, String> getRocketMqConsumerData(String topicName) {
         Map<String, String> data = new HashMap<>();
-        DefaultLitePullConsumer consumer =
-                RocketMqAdminUtil.initDefaultLitePullConsumer(newConfiguration(), false);
-        consumer.start();
-        // assign
-        Map<MessageQueue, TopicOffset> queueOffsets =
-                RetryUtils.retryWithException(
-                        () -> {
-                            return RocketMqAdminUtil.offsetTopics(
-                                            newConfiguration(), Lists.newArrayList(topicName))
-                                    .get(0);
-                        },
-                        new RetryUtils.RetryMaterial(
-                                Constant.OPERATION_RETRY_TIME,
-                                false,
-                                exception -> exception instanceof RocketMqConnectorException,
-                                Constant.OPERATION_RETRY_SLEEP));
-        consumer.assign(queueOffsets.keySet());
-        // seek to offset
-        Map<MessageQueue, Long> currentOffsets =
-                RocketMqAdminUtil.currentOffsets(
-                        newConfiguration(), Lists.newArrayList(topicName), queueOffsets.keySet());
-        for (MessageQueue mq : queueOffsets.keySet()) {
-            long currentOffset =
-                    currentOffsets.containsKey(mq)
-                            ? currentOffsets.get(mq)
-                            : queueOffsets.get(mq).getMinOffset();
-            consumer.seek(mq, currentOffset);
-        }
-        while (true) {
-            List<MessageExt> messages = consumer.poll(5000);
-            if (messages.isEmpty()) {
-                break;
+        try {
+            DefaultLitePullConsumer consumer =
+                    RocketMqAdminUtil.initDefaultLitePullConsumer(newConfiguration(), false);
+            consumer.start();
+            // assign
+            Map<MessageQueue, TopicOffset> queueOffsets =
+                    RetryUtils.retryWithException(
+                            () -> {
+                                return RocketMqAdminUtil.offsetTopics(
+                                                newConfiguration(), Lists.newArrayList(topicName))
+                                        .get(0);
+                            },
+                            new RetryUtils.RetryMaterial(
+                                    Constant.OPERATION_RETRY_TIME,
+                                    false,
+                                    exception -> exception instanceof RocketMqConnectorException,
+                                    Constant.OPERATION_RETRY_SLEEP));
+            consumer.assign(queueOffsets.keySet());
+            // seek to offset
+            Map<MessageQueue, Long> currentOffsets =
+                    RocketMqAdminUtil.currentOffsets(
+                            newConfiguration(),
+                            Lists.newArrayList(topicName),
+                            queueOffsets.keySet());
+            for (MessageQueue mq : queueOffsets.keySet()) {
+                long currentOffset =
+                        currentOffsets.containsKey(mq)
+                                ? currentOffsets.get(mq)
+                                : queueOffsets.get(mq).getMinOffset();
+                consumer.seek(mq, currentOffset);
             }
-            for (MessageExt message : messages) {
-                data.put(message.getKeys(), new String(message.getBody(), StandardCharsets.UTF_8));
+            while (true) {
+                List<MessageExt> messages = consumer.poll(5000);
+                if (messages.isEmpty()) {
+                    break;
+                }
+                for (MessageExt message : messages) {
+                    data.put(
+                            message.getKeys(),
+                            new String(message.getBody(), StandardCharsets.UTF_8));
+                    consumer.getOffsetStore()
+                            .updateConsumeOffsetToBroker(
+                                    new MessageQueue(
+                                            message.getTopic(),
+                                            message.getBrokerName(),
+                                            message.getQueueId()),
+                                    message.getQueueOffset(),
+                                    false);
+                }
+                consumer.commitSync();
             }
-            consumer.commitSync();
-        }
-        if (consumer != null) {
-            consumer.shutdown();
+            if (consumer != null) {
+                consumer.shutdown();
+            }
+            log.info("Consumer {} data total {}", topicName, data.size());
+            // consumer.commitSync() only submits the offset to the broker, and NameServer scans the
+            // broker to update the offset every 10 seconds
+            Thread.sleep(20 * 1000);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
         }
-        log.info("Consumer {} data total {}", topicName, data.size());
         return data;
     }
 
@@ -362,6 +381,7 @@ public class RocketMqIT extends TestSuiteBase implements TestResource {
                 .groupId(ROCKETMQ_GROUP)
                 .aclEnable(false)
                 .namesrvAddr(rocketMqContainer.getNameSrvAddr())
+                .batchSize(10)
                 .build();
     }