You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/12/09 01:42:41 UTC
[rocketmq-streams] branch main updated: fix(example) fix example: RocketMQSourceExample1 (#100)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new fba96e8 fix(example) fix example: RocketMQSourceExample1 (#100)
fba96e8 is described below
commit fba96e8c04bc6289fecb34d228636c143331524a
Author: Ni Ze <31...@users.noreply.github.com>
AuthorDate: Thu Dec 9 09:42:33 2021 +0800
fix(example) fix example: RocketMQSourceExample1 (#100)
Co-authored-by: 维章 <un...@gmail.com>
---
.../rocketmq/streams/source/RocketMQSource.java | 26 +++++++++++++++++-----
1 file changed, 21 insertions(+), 5 deletions(-)
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 5966fee..662f215 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -287,13 +287,17 @@ public class RocketMQSource extends AbstractSupportShuffleSource {
}
MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(defaultMQPushConsumer.getDefaultMQPushConsumer());
RemoteBrokerOffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, NamespaceUtil.wrapNamespace(consumer.getNamespace(), consumer.getConsumerGroup())) {
-
+ Set<MessageQueue> firstComing = new HashSet<>();
@Override
public void removeOffset(MessageQueue mq) {
- Set<String> splitIds = new HashSet<>();
- splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
- removeSplit(splitIds);
- super.removeOffset(mq);
+ if (!firstComing.contains(mq)){
+ firstComing.add(mq);
+ } else {
+ Set<String> splitIds = new HashSet<>();
+ splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
+ removeSplit(splitIds);
+ super.removeOffset(mq);
+ }
}
@Override
@@ -398,4 +402,16 @@ public class RocketMQSource extends AbstractSupportShuffleSource {
public void setConsumerOffset(String consumerOffset) {
this.consumerOffset = consumerOffset;
}
+
+ public String getStrategyName() {
+ return strategyName;
+ }
+
+ public void setStrategyName(String strategyName) {
+ this.strategyName = strategyName;
+ }
+
+ public void setConsumer(DefaultMQPushConsumer consumer) {
+ this.consumer = consumer;
+ }
}
\ No newline at end of file