You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/12/09 01:42:41 UTC

[rocketmq-streams] branch main updated: fix(example) fix example: RocketMQSourceExample1 (#100)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new fba96e8  fix(example) fix example: RocketMQSourceExample1 (#100)
fba96e8 is described below

commit fba96e8c04bc6289fecb34d228636c143331524a
Author: Ni Ze <31...@users.noreply.github.com>
AuthorDate: Thu Dec 9 09:42:33 2021 +0800

    fix(example) fix example: RocketMQSourceExample1 (#100)
    
    Co-authored-by: 维章 <un...@gmail.com>
---
 .../rocketmq/streams/source/RocketMQSource.java    | 26 +++++++++++++++++-----
 1 file changed, 21 insertions(+), 5 deletions(-)

diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 5966fee..662f215 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -287,13 +287,17 @@ public class RocketMQSource extends AbstractSupportShuffleSource {
         }
         MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(defaultMQPushConsumer.getDefaultMQPushConsumer());
         RemoteBrokerOffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, NamespaceUtil.wrapNamespace(consumer.getNamespace(), consumer.getConsumerGroup())) {
-
+            Set<MessageQueue> firstComing = new HashSet<>();
             @Override
             public void removeOffset(MessageQueue mq) {
-                Set<String> splitIds = new HashSet<>();
-                splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
-                removeSplit(splitIds);
-                super.removeOffset(mq);
+                if (!firstComing.contains(mq)){
+                    firstComing.add(mq);
+                } else {
+                    Set<String> splitIds = new HashSet<>();
+                    splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
+                    removeSplit(splitIds);
+                    super.removeOffset(mq);
+                }
             }
 
             @Override
@@ -398,4 +402,16 @@ public class RocketMQSource extends AbstractSupportShuffleSource {
     public void setConsumerOffset(String consumerOffset) {
         this.consumerOffset = consumerOffset;
     }
+
+    public String getStrategyName() {
+        return strategyName;
+    }
+
+    public void setStrategyName(String strategyName) {
+        this.strategyName = strategyName;
+    }
+
+    public void setConsumer(DefaultMQPushConsumer consumer) {
+        this.consumer = consumer;
+    }
 }
\ No newline at end of file