You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2019/07/11 11:59:37 UTC
[rocketmq] branch develop updated: Add RPCHook construct method for
MQPullConsumerScheduleService (#1314)
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c40cdf0 Add RPCHook construct method for MQPullConsumerScheduleService (#1314)
c40cdf0 is described below
commit c40cdf09c0a891adc4b8ce871d6846f3c132d617
Author: Heng Du <du...@apache.org>
AuthorDate: Thu Jul 11 19:59:31 2019 +0800
Add RPCHook construct method for MQPullConsumerScheduleService (#1314)
---
.../rocketmq/client/consumer/MQPullConsumerScheduleService.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
index 44b864e..5436688 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
@@ -26,9 +26,10 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.RPCHook;
/**
* Schedule service for pull consumer
@@ -49,6 +50,11 @@ public class MQPullConsumerScheduleService {
this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING);
}
+ public MQPullConsumerScheduleService(final String consumerGroup, final RPCHook rpcHook) {
+ this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup, rpcHook);
+ this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING);
+ }
+
public void putTask(String topic, Set<MessageQueue> mqNewSet) {
Iterator<Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
while (it.hasNext()) {