You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/02/22 11:37:42 UTC
[rocketmq] branch snode updated: Polish the push consumer to
support 4.x
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new fbe64ff Polish the push consumer to support 4.x
new 509bce4 Merge pull request #864 from ShannonDing/snode
fbe64ff is described below
commit fbe64ffbe58b026d783254dcb5b4b468b2901c52
Author: ShannonDing <li...@163.com>
AuthorDate: Fri Feb 22 19:28:23 2019 +0800
Polish the push consumer to support 4.x
---
.../client/consumer/DefaultMQPushConsumer.java | 22 +--
...onsumer.java => DefaultMQRealPushConsumer.java} | 36 ++---
.../rocketmq/client/consumer/MQPushConsumer.java | 12 +-
.../client/consumer/MQRealPushConsumer.java | 68 +++++++++
.../ConsumeMessageConcurrentlyService.java | 7 +-
.../consumer/ConsumeMessageOrderlyService.java | 7 +-
.../impl/consumer/DefaultMQPushConsumerImpl.java | 154 ++-------------------
...mpl.java => DefaultMQRealPushConsumerImpl.java} | 61 ++++----
.../client/impl/consumer/MQPushConsumerInner.java | 63 +++++++++
.../client/impl/consumer/ProcessQueue.java | 4 +-
.../client/impl/consumer/PullMessageService.java | 7 +-
.../client/impl/consumer/RebalancePushImpl.java | 7 +-
.../impl/consumer/RebalanceRealPushImpl.java | 2 +-
.../client/impl/factory/MQClientInstance.java | 3 +-
.../client/trace/AsyncTraceDispatcher.java | 7 +-
.../rocketmq/example/quickstart/Consumer.java | 7 +-
.../rocketmq/example/quickstart/Producer.java | 3 +-
.../rocketmq/example/simple/PushConsumer.java | 4 +-
18 files changed, 231 insertions(+), 243 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 91e9f88..dc08ac1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -59,7 +59,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
* <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
* </p>
*/
-public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
+public class DefaultMQPushConsumer extends ClientConfig implements MQRealPushConsumer {
private final InternalLogger log = ClientLogger.getLog();
@@ -223,7 +223,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Whether update subscription relationship when every pull
*/
- private boolean postSubscriptionWhenPull = true;
+ private boolean postSubscriptionWhenPull = false;
/**
* Whether the unit of subscription group
@@ -276,24 +276,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
}
/**
- * Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
- *
- * @param consumerGroup Consume queue.
- * @param rpcHook RPC hook to execute before each remoting command.
- * @param allocateMessageQueueStrategy Message queue allocating algorithm.
- */
- public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
- AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
- this.consumerGroup = consumerGroup;
- if (allocateMessageQueueStrategy == null) {
- this.allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
- } else {
- this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
- }
- defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook, realPushModel);
- }
-
- /**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and
* customized trace topic name.
*
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQRealPushConsumer.java
similarity index 95%
copy from client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQRealPushConsumer.java
index 91e9f88..b4ca6f7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQRealPushConsumer.java
@@ -29,6 +29,8 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQRealPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
@@ -59,14 +61,14 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
* <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
* </p>
*/
-public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
+public class DefaultMQRealPushConsumer extends ClientConfig implements MQRealPushConsumer {
private final InternalLogger log = ClientLogger.getLog();
/**
* Internal implementation. Most of the functions herein are delegated to it.
*/
- protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+ protected final transient DefaultMQRealPushConsumerImpl defaultMQPushConsumerImpl;
/**
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
@@ -257,7 +259,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Default constructor.
*/
- public DefaultMQPushConsumer() {
+ public DefaultMQRealPushConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
}
@@ -268,11 +270,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
- public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
+ public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
- defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
+ defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook);
}
/**
@@ -282,7 +284,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
- public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
+ public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean realPushModel) {
this.consumerGroup = consumerGroup;
if (allocateMessageQueueStrategy == null) {
@@ -290,7 +292,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
} else {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
- defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook, realPushModel);
+ defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook, realPushModel);
}
/**
@@ -304,12 +306,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
- public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
+ public DefaultMQRealPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
- defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
+ defaultMQPushConsumerImpl = new DefaultMQRealPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
@@ -328,7 +330,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* @param rpcHook RPC hook to execute before each remoting command.
*/
- public DefaultMQPushConsumer(RPCHook rpcHook) {
+ public DefaultMQRealPushConsumer(RPCHook rpcHook) {
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
@@ -338,7 +340,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
*/
- public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace) {
+ public DefaultMQRealPushConsumer(final String consumerGroup, boolean enableMsgTrace) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null);
}
@@ -350,7 +352,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
- public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
+ public DefaultMQRealPushConsumer(final String consumerGroup, boolean enableMsgTrace,
final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}
@@ -360,7 +362,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* @param consumerGroup Consumer group.
*/
- public DefaultMQPushConsumer(final String consumerGroup) {
+ public DefaultMQRealPushConsumer(final String consumerGroup) {
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
@@ -474,7 +476,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.consumeThreadMin = consumeThreadMin;
}
- public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
+ public MQPushConsumerInner getDefaultMQPushConsumerImpl() {
return defaultMQPushConsumerImpl;
}
@@ -675,9 +677,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Subscribe a topic by message selector.
*
* @param topic topic to consume.
- * @param messageSelector {@link org.apache.rocketmq.client.consumer.MessageSelector}
- * @see org.apache.rocketmq.client.consumer.MessageSelector#bySql
- * @see org.apache.rocketmq.client.consumer.MessageSelector#byTag
+ * @param messageSelector {@link MessageSelector}
+ * @see MessageSelector#bySql
+ * @see MessageSelector#byTag
*/
@Override
public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
index bc6d328..e60085f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
@@ -20,6 +20,7 @@ import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
/**
* Push consumer
@@ -49,14 +50,13 @@ public interface MQPushConsumer extends MQConsumer {
* Subscribe some topic
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
- * null or * expression,meaning subscribe
- * all
+ * null or * expression,meaning subscribe all
*/
void subscribe(final String topic, final String subExpression) throws MQClientException;
/**
- * This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
- * is recommended.
+ * This method will be removed in the version 5.0.0,because filterServer was removed,and method
+ * <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
*
* Subscribe some topic
*
@@ -70,8 +70,8 @@ public interface MQPushConsumer extends MQConsumer {
/**
* Subscribe some topic with selector.
* <p>
- * This interface also has the ability of {@link #subscribe(String, String)},
- * and, support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}.
+ * This interface also has the ability of {@link #subscribe(String, String)}, and, support other message selection,
+ * such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}.
* </p>
* <p/>
* <p>
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQRealPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQRealPushConsumer.java
new file mode 100644
index 0000000..80e4e34
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQRealPushConsumer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+/**
+ * Real Push consumer
+ */
+public interface MQRealPushConsumer extends MQPushConsumer {
+
+ MQPushConsumerInner getDefaultMQPushConsumerImpl();
+
+ long getConsumeTimeout();
+
+ int getPullThresholdForTopic();
+
+ int getPullThresholdForQueue();
+
+ void setPullThresholdForQueue(int pullThresholdForQueue);
+
+ int getPullThresholdSizeForTopic();
+
+ void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic);
+
+ int getPullThresholdSizeForQueue();
+
+ void setPullThresholdSizeForQueue(final int pullThresholdSizeForQueue);
+
+ ConsumeFromWhere getConsumeFromWhere();
+
+ String getConsumeTimestamp();
+
+ String getConsumerGroup();
+
+ int getConsumeThreadMin();
+
+ void setConsumeThreadMin(int consumeThreadMin);
+
+ int getConsumeThreadMax();
+
+ long getSuspendCurrentQueueTimeMillis();
+
+ int getMaxReconsumeTimes();
+
+ void setMaxReconsumeTimes(final int maxReconsumeTimes);
+
+ int getConsumeMessageBatchMaxSize();
+
+ MessageModel getMessageModel();
+
+}
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 58b985f..c6c3f2d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
@@ -50,8 +51,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();
- private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
- private final DefaultMQPushConsumer defaultMQPushConsumer;
+ private final MQPushConsumerInner defaultMQPushConsumerImpl;
+ private final MQRealPushConsumer defaultMQPushConsumer;
private final MessageListenerConcurrently messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue;
private final ThreadPoolExecutor consumeExecutor;
@@ -60,7 +61,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService cleanExpireMsgExecutors;
- public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
+ public ConsumeMessageConcurrentlyService(MQPushConsumerInner defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index d4a9953..e1717f8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
@@ -52,8 +53,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();
private final static long MAX_TIME_CONSUME_CONTINUOUSLY =
Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
- private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
- private final DefaultMQPushConsumer defaultMQPushConsumer;
+ private final MQPushConsumerInner defaultMQPushConsumerImpl;
+ private final MQRealPushConsumer defaultMQPushConsumer;
private final MessageListenerOrderly messageListener;
private final BlockingQueue<Runnable> consumeRequestQueue;
private final ThreadPoolExecutor consumeExecutor;
@@ -62,7 +63,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private final ScheduledExecutorService scheduledExecutorService;
private volatile boolean stopped = false;
- public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
+ public ConsumeMessageOrderlyService(MQPushConsumerInner defaultMQPushConsumerImpl,
MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index ff5f418..6108530 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -26,10 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -76,9 +73,10 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
-public class DefaultMQPushConsumerImpl implements MQConsumerInner {
+public class DefaultMQPushConsumerImpl implements MQPushConsumerInner {
/**
* Delay some time when exception occur
*/
@@ -95,8 +93,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumer defaultMQPushConsumer;
- //private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
- private final RebalanceImpl rebalanceImpl;
+ private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
@@ -111,26 +108,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private ConsumeMessageService consumeMessageService;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;
- private boolean realPushModel = true;
- private final ConcurrentHashMap<String, AtomicLong> localConsumerOffset = new ConcurrentHashMap<String, AtomicLong>();
- private final ConcurrentHashMap<String, AtomicBoolean> pullStopped = new ConcurrentHashMap<String, AtomicBoolean>();
- private final ConcurrentHashMap<String, ProcessQueue> processQueues = new ConcurrentHashMap<String, ProcessQueue>();
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
- this(defaultMQPushConsumer, rpcHook, true);
- }
-
- public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
- boolean realPushModel) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
- this.realPushModel = realPushModel;
- if (realPushModel) {
- log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup());
- rebalanceImpl = new RebalanceRealPushImpl(this);
- } else {
- rebalanceImpl = new RebalancePushImpl(this);
- }
}
public void registerFilterMessageHook(final FilterMessageHook hook) {
@@ -307,17 +288,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
- //Update local offset according remote offset
- String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(),
- pullRequest.getMessageQueue().getBrokerName(),
- pullRequest.getMessageQueue().getQueueId());
- AtomicLong localOffset = localConsumerOffset.get(localOffsetKey);
- if (localOffset == null) {
- localConsumerOffset.putIfAbsent(localOffsetKey, new AtomicLong(-1));
- }
- localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
-
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
@@ -475,15 +445,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
- String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(),
- pullRequest.getMessageQueue().getBrokerName(),
- pullRequest.getMessageQueue().getQueueId());
- if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
- //Stop pull request
- log.info("Stop pull request, {}", localOffsetKey);
- return;
- }
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
}
@@ -500,15 +461,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
- String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(),
- pullRequest.getMessageQueue().getBrokerName(),
- pullRequest.getMessageQueue().getQueueId());
- if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) {
- //Stop pull request
- log.info("Stop pull request, {}", localOffsetKey);
- return;
- }
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
@@ -545,8 +497,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
- String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish();
- this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, snodeAddr, msg,
+ String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
+ : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+ this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(null, brokerAddr,msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
@@ -675,10 +628,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
default:
break;
}
- this.tryToFindSnodePublishInfo();
+
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
- this.mQClientFactory.sendHeartbeatToAllSnodeWithLock();
+ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
@@ -896,8 +849,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- this.mQClientFactory.createRetryTopic(topic, this.defaultMQPushConsumer.getConsumerGroup());
-
}
}
}
@@ -1020,11 +971,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public ConsumeType consumeType() {
- if (realPushModel) {
- return ConsumeType.CONSUME_PUSH;
- } else {
- return ConsumeType.CONSUME_PASSIVELY;
- }
+ return ConsumeType.CONSUME_PASSIVELY;
}
@Override
@@ -1193,90 +1140,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
- private void tryToFindSnodePublishInfo() {
- this.mQClientFactory.updateSnodeInfoFromNameServer();
- }
-
- public boolean processPushMessage(final MessageExt msg,
- final String consumerGroup,
- final String topic,
- final String brokerName,
- final int queueID,
- final long offset) {
- String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
- AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
- if (localOffset == null) {
- log.info("Current Local offset have not set, initiallized to -1.");
- this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
- return false;
- }
- if (localOffset.get() + 1 < offset) {
- //should start pull message process
- log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
- return false;
- } else {
- //Stop pull request
- log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
- AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
- if (pullStop == null) {
- this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
- log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
- }
- pullStop = this.pullStopped.get(localOffsetKey);
- if (!pullStop.get()) {
- pullStop.set(true);
- log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
- }
- //update local offset
- localOffset.set(offset);
- //submit to process queue
- List<MessageExt> messageExtList = new ArrayList<MessageExt>();
- messageExtList.add(msg);
- ProcessQueue processQueue = processQueues.get(localOffsetKey);
- if (processQueue == null) {
- processQueues.put(localOffsetKey, new ProcessQueue());
- processQueue = processQueues.get(localOffsetKey);
- }
- processQueue.putMessage(messageExtList);
- MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID);
- this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
- }
- return true;
- }
-
- private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) {
- return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
- }
-
public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
- String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
- ProcessQueue processQueue = processQueues.get(localOffsetKey);
- if (processQueue != null) {
- log.info("Clear local expire message for {} in processQueue.", localOffsetKey);
- processQueue.cleanExpiredMsg(this.defaultMQPushConsumer);
- }
- AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
- if (pullStop != null) {
- if (pullStop.get()) {
- pullStop.set(false);
- log.info("Resume Pull Request of {} is set to TRUE, and then the pull request will start by rebalance again...", localOffsetKey);
- }
- }
- return true;
- }
-
- public boolean pausePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
- String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
- AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
- if (pullStop == null) {
- this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
- log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
- return true;
- }
- if (!pullStop.get()) {
- pullStop.set(true);
- log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
- }
return true;
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java
similarity index 95%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
copy to client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java
index ff5f418..785e5b8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java
@@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
@@ -78,7 +81,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
-public class DefaultMQPushConsumerImpl implements MQConsumerInner {
+public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
/**
* Delay some time when exception occur
*/
@@ -94,7 +97,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
private final InternalLogger log = ClientLogger.getLog();
- private final DefaultMQPushConsumer defaultMQPushConsumer;
+ private final DefaultMQRealPushConsumer defaultMQPushConsumer;
//private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
private final RebalanceImpl rebalanceImpl;
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
@@ -116,21 +119,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private final ConcurrentHashMap<String, AtomicBoolean> pullStopped = new ConcurrentHashMap<String, AtomicBoolean>();
private final ConcurrentHashMap<String, ProcessQueue> processQueues = new ConcurrentHashMap<String, ProcessQueue>();
- public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
+ public DefaultMQRealPushConsumerImpl(DefaultMQRealPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this(defaultMQPushConsumer, rpcHook, true);
}
- public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
+ public DefaultMQRealPushConsumerImpl(DefaultMQRealPushConsumer defaultMQPushConsumer, RPCHook rpcHook,
boolean realPushModel) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
this.realPushModel = realPushModel;
- if (realPushModel) {
- log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup());
- rebalanceImpl = new RebalanceRealPushImpl(this);
- } else {
- rebalanceImpl = new RebalancePushImpl(this);
- }
+ log.info("Open Real Push Model for {}", defaultMQPushConsumer.getConsumerGroup());
+ rebalanceImpl = new RebalanceRealPushImpl(this);
}
public void registerFilterMessageHook(final FilterMessageHook hook) {
@@ -191,7 +190,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
return result;
}
- public DefaultMQPushConsumer getDefaultMQPushConsumer() {
+ public MQRealPushConsumer getDefaultMQPushConsumer() {
return defaultMQPushConsumer;
}
@@ -318,7 +317,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
- pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
+ pullResult = DefaultMQRealPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
@@ -326,30 +325,30 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
+ DefaultMQRealPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+ DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
+ DefaultMQRealPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
+ DefaultMQRealPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
- if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
- DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
+ if (DefaultMQRealPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
+ DefaultMQRealPushConsumerImpl.this.executePullRequestLater(pullRequest,
+ DefaultMQRealPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+ DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
@@ -366,16 +365,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
+ DefaultMQRealPushConsumerImpl.this.correctTagsOffset(pullRequest);
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+ DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
+ DefaultMQRealPushConsumerImpl.this.correctTagsOffset(pullRequest);
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
+ DefaultMQRealPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
@@ -383,17 +382,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
- DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
+ DefaultMQRealPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
- DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
+ DefaultMQRealPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
- DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
+ DefaultMQRealPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
- DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
+ DefaultMQRealPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
@@ -414,7 +413,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
log.warn("execute the pull request exception", e);
}
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ DefaultMQRealPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
@@ -856,7 +855,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
- for (final Map.Entry<String, String> entry : sub.entrySet()) {
+ for (final Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
@@ -893,7 +892,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
- for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
+ for (final Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
this.mQClientFactory.createRetryTopic(topic, this.defaultMQPushConsumer.getConsumerGroup());
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQPushConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQPushConsumerInner.java
new file mode 100644
index 0000000..d29667d
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQPushConsumerInner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+/**
+ * Push Consumer inner interface
+ */
+public interface MQPushConsumerInner extends MQConsumerInner {
+
+ MQRealPushConsumer getDefaultMQPushConsumer();
+
+ OffsetStore getOffsetStore();
+
+ boolean isConsumeOrderly();
+
+ MQClientInstance getmQClientFactory();
+
+ boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID);
+
+ void executePullRequestImmediately(final PullRequest pullRequest);
+
+ RebalanceImpl getRebalanceImpl();
+
+ ConsumerStatsManager getConsumerStatsManager();
+
+ void sendMessageBack(MessageExt msg, int delayLevel,
+ final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
+ boolean hasHook();
+
+ void registerConsumeMessageHook(final ConsumeMessageHook hook);
+
+ void executeHookBefore(final ConsumeMessageContext context);
+
+ void executeHookAfter(final ConsumeMessageContext context);
+
+ void pullMessage(final PullRequest pullRequest);
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 0a52817..661bf03 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -27,6 +27,8 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQRealPushConsumer;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -73,7 +75,7 @@ public class ProcessQueue {
/**
* @param pushConsumer
*/
- public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
+ public void cleanExpiredMsg(MQRealPushConsumer pushConsumer) {
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
return;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
index bd46a58..d58eed3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -79,7 +79,12 @@ public class PullMessageService extends ServiceThread {
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
- DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
+ MQPushConsumerInner impl;
+ if (consumer instanceof DefaultMQRealPushConsumerImpl) {
+ impl = (DefaultMQRealPushConsumerImpl) consumer;
+ } else {
+ impl = (DefaultMQPushConsumerImpl) consumer;
+ }
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 2e622be..8768ffa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -34,15 +35,15 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public class RebalancePushImpl extends RebalanceImpl {
private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
- private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+ private final MQPushConsumerInner defaultMQPushConsumerImpl;
- public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+ public RebalancePushImpl(MQPushConsumerInner defaultMQPushConsumerImpl) {
this(null, null, null, null, defaultMQPushConsumerImpl);
}
public RebalancePushImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
- MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+ MQClientInstance mQClientFactory, MQPushConsumerInner defaultMQPushConsumerImpl) {
super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java
index 8bd2701..8ee30a1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceRealPushImpl.java
@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
public class RebalanceRealPushImpl extends RebalancePushImpl {
- public RebalanceRealPushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+ public RebalanceRealPushImpl(MQPushConsumerInner defaultMQPushConsumerImpl) {
super(defaultMQPushConsumerImpl);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 0953fdc..d5dcb39 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQRealPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullMessageService;
@@ -1387,7 +1388,7 @@ public class MQClientInstance {
consumerGroup, topic, brokerName, queueID, offset);
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
- DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
+ DefaultMQRealPushConsumerImpl consumer = (DefaultMQRealPushConsumerImpl) mqConsumerInner;
consumer.processPushMessage(msg, consumerGroup, topic, brokerName, queueID, offset);
return true;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 87a795e..dd795cd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.MQPushConsumerInner;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
@@ -67,7 +68,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
private DefaultMQProducerImpl hostProducer;
- private DefaultMQPushConsumerImpl hostConsumer;
+ private MQPushConsumerInner hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
@@ -117,11 +118,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.hostProducer = hostProducer;
}
- public DefaultMQPushConsumerImpl getHostConsumer() {
+ public MQPushConsumerInner getHostConsumer() {
return hostConsumer;
}
- public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
+ public void setHostConsumer(MQPushConsumerInner hostConsumer) {
this.hostConsumer = hostConsumer;
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
index df3b3b0..233b002 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.example.quickstart;
import java.util.List;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@@ -26,7 +26,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
/**
- * This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.
+ * This example shows how to subscribe and consume messages using providing {@link DefaultMQRealPushConsumer}.
*/
public class Consumer {
@@ -35,7 +35,7 @@ public class Consumer {
/*
* Instantiate with specified consumer group name.
*/
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello");
+ DefaultMQRealPushConsumer consumer = new DefaultMQRealPushConsumer("hello");
/*
* Specify name server addresses.
@@ -61,7 +61,6 @@ public class Consumer {
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
-// consumer.setNamesrvAddr("47.102.149.193:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
index 98dcd61..80c27f4 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
@@ -47,7 +47,6 @@ public class Producer {
/*
* Launch the instance.
*/
-// producer.setNamesrvAddr("47.102.149.193:9876");
producer.start();
for (int i = 0; i < 10; i++) {
@@ -76,7 +75,7 @@ public class Producer {
/*
* Shut down once the producer instance is not longer in use.
*/
- Thread.sleep(30000L);
+ Thread.sleep(3000L);
producer.shutdown();
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
index abbfbdf..29a51ac 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.example.simple;
import java.util.List;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQRealPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@@ -28,7 +28,7 @@ import org.apache.rocketmq.common.message.MessageExt;
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
+ DefaultMQRealPushConsumer consumer = new DefaultMQRealPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800