You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/04/19 01:35:08 UTC
[rocketmq] branch pop_consumer updated: [RIP-19] Pop Consuming
(client)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch pop_consumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/pop_consumer by this push:
new 9d8f4c2 [RIP-19] Pop Consuming (client)
new cd44623 Merge pull request #2808 from hill007299/pop_consumer
9d8f4c2 is described below
commit 9d8f4c2cc0e30d8240fb325ed047634f1ad67275
Author: hill007299 <hi...@126.com>
AuthorDate: Tue Mar 9 11:17:55 2021 +0800
[RIP-19] Pop Consuming (client)
---
.../AckCallback.java} | 29 +-
.../AckResult.java} | 45 +-
.../AckStatus.java} | 38 +-
.../client/consumer/DefaultMQPushConsumer.java | 51 +++
.../PopCallback.java} | 32 +-
.../apache/rocketmq/client/consumer/PopResult.java | 82 ++++
.../PopStatus.java} | 47 +-
...MessageService.java => BaseInvokeCallback.java} | 34 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 286 +++++++++++-
.../ConsumeMessageConcurrentlyService.java | 8 +-
.../consumer/ConsumeMessageOrderlyService.java | 14 +-
...a => ConsumeMessagePopConcurrentlyService.java} | 265 +++++------
.../consumer/ConsumeMessagePopOrderlyService.java | 408 +++++++++++++++++
.../impl/consumer/ConsumeMessageService.java | 5 +
.../impl/consumer/DefaultLitePullConsumerImpl.java | 6 +
.../impl/consumer/DefaultMQPushConsumerImpl.java | 331 +++++++++++++-
.../client/impl/consumer/MessageQueueLock.java | 29 +-
...sumeMessageService.java => MessageRequest.java} | 27 +-
.../client/impl/consumer/PopProcessQueue.java | 84 ++++
.../consumer/{PullRequest.java => PopRequest.java} | 61 ++-
.../client/impl/consumer/PullAPIWrapper.java | 55 ++-
.../client/impl/consumer/PullMessageService.java | 47 +-
.../rocketmq/client/impl/consumer/PullRequest.java | 8 +-
.../client/impl/consumer/RebalanceImpl.java | 478 ++++++++++++++++++--
.../impl/consumer/RebalanceLitePullImpl.java | 25 +-
.../client/impl/consumer/RebalancePullImpl.java | 26 +-
.../client/impl/consumer/RebalancePushImpl.java | 56 ++-
.../client/impl/factory/MQClientInstance.java | 37 +-
.../consumer/DefaultLitePullConsumerTest.java | 27 +-
.../client/consumer/DefaultMQPushConsumerTest.java | 36 +-
.../rocketmq/client/impl/MQClientAPIImplTest.java | 490 ++++++++++++++++++++-
.../consumer/DefaultMQPushConsumerImplTest.java | 92 ++++
.../impl/consumer/RebalancePushImplTest.java | 47 +-
.../client/impl/factory/MQClientInstanceTest.java | 1 +
.../client/producer/DefaultMQProducerTest.java | 5 +-
.../trace/DefaultMQConsumerWithTraceTest.java | 4 -
.../powermock/extensions/configuration.properties | 16 +
37 files changed, 2883 insertions(+), 449 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/AckCallback.java
similarity index 50%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/AckCallback.java
index 5078c97..99a261c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/AckCallback.java
@@ -14,31 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.impl.consumer;
+package org.apache.rocketmq.client.consumer;
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+public interface AckCallback {
+ void onSuccess(final AckResult ackResult);
-public interface ConsumeMessageService {
- void start();
-
- void shutdown(long awaitTerminateMillis);
-
- void updateCorePoolSize(int corePoolSize);
-
- void incCorePoolSize();
-
- void decCorePoolSize();
-
- int getCorePoolSize();
-
- ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
-
- void submitConsumeRequest(
- final List<MessageExt> msgs,
- final ProcessQueue processQueue,
- final MessageQueue messageQueue,
- final boolean dispathToConsume);
+ void onException(final Throwable e);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/AckResult.java
similarity index 50%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/AckResult.java
index 5078c97..06cb59a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/AckResult.java
@@ -14,31 +14,40 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.impl.consumer;
+package org.apache.rocketmq.client.consumer;
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-public interface ConsumeMessageService {
- void start();
+public class AckResult {
+ private AckStatus status;
+ private String extraInfo;
+ private long popTime;
- void shutdown(long awaitTerminateMillis);
+ public void setPopTime(long popTime) {
+ this.popTime = popTime;
+ }
- void updateCorePoolSize(int corePoolSize);
+ public long getPopTime() {
+ return popTime;
+ }
- void incCorePoolSize();
+ public AckStatus getStatus() {
+ return status;
+ }
- void decCorePoolSize();
+ public void setStatus(AckStatus status) {
+ this.status = status;
+ }
- int getCorePoolSize();
+ public void setExtraInfo(String extraInfo) {
+ this.extraInfo = extraInfo;
+ }
- ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
+ public String getExtraInfo() {
+ return extraInfo;
+ }
- void submitConsumeRequest(
- final List<MessageExt> msgs,
- final ProcessQueue processQueue,
- final MessageQueue messageQueue,
- final boolean dispathToConsume);
+ @Override
+ public String toString() {
+ return "AckResult [AckStatus=" + status + ",extraInfo=" + extraInfo + "]";
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/AckStatus.java
similarity index 50%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/AckStatus.java
index 5078c97..b144f8f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/AckStatus.java
@@ -14,31 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.impl.consumer;
-
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-
-public interface ConsumeMessageService {
- void start();
-
- void shutdown(long awaitTerminateMillis);
-
- void updateCorePoolSize(int corePoolSize);
-
- void incCorePoolSize();
-
- void decCorePoolSize();
-
- int getCorePoolSize();
-
- ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
-
- void submitConsumeRequest(
- final List<MessageExt> msgs,
- final ProcessQueue processQueue,
- final MessageQueue messageQueue,
- final boolean dispathToConsume);
+package org.apache.rocketmq.client.consumer;
+
+public enum AckStatus {
+ /**
+ * ack success
+ */
+ OK,
+ /**
+ * msg not exist
+ */
+ NO_EXIST,
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 9011117..d32dd15 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -180,6 +180,12 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private int pullThresholdForQueue = 1000;
/**
+ * Flow control threshold on queue level, means max num of messages waiting to ack.
+ * in contrast with pull threshold, once a message is popped, it's considered the beginning of consumption.
+ */
+ private int popThresholdForQueue = 96;
+
+ /**
* Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
*
@@ -255,6 +261,16 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private long consumeTimeout = 15;
/**
+ * Maximum amount of invisible time in millisecond of a message, rang is [5000, 300000]
+ */
+ private long popInvisibleTime = 60000;
+
+ /**
+ * Batch pop size. range is [1, 32]
+ */
+ private int popBatchNums = 32;
+
+ /**
* Maximum time to await message consuming when shutdown consumer, 0 indicates no await.
*/
private long awaitTerminationMillisWhenShutdown = 0;
@@ -264,6 +280,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private TraceDispatcher traceDispatcher = null;
+ // force to use client rebalance
+ private boolean clientRebalance = false;
+
/**
* Default constructor.
*/
@@ -598,6 +617,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.pullThresholdForQueue = pullThresholdForQueue;
}
+ public int getPopThresholdForQueue() {
+ return popThresholdForQueue;
+ }
+
+ public void setPopThresholdForQueue(int popThresholdForQueue) {
+ this.popThresholdForQueue = popThresholdForQueue;
+ }
+
public int getPullThresholdForTopic() {
return pullThresholdForTopic;
}
@@ -891,6 +918,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.consumeTimeout = consumeTimeout;
}
+ public long getPopInvisibleTime() {
+ return popInvisibleTime;
+ }
+
+ public void setPopInvisibleTime(long popInvisibleTime) {
+ this.popInvisibleTime = popInvisibleTime;
+ }
+
public long getAwaitTerminationMillisWhenShutdown() {
return awaitTerminationMillisWhenShutdown;
}
@@ -902,4 +937,20 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}
+
+ public int getPopBatchNums() {
+ return popBatchNums;
+ }
+
+ public void setPopBatchNums(int popBatchNums) {
+ this.popBatchNums = popBatchNums;
+ }
+
+ public boolean isClientRebalance() {
+ return clientRebalance;
+ }
+
+ public void setClientRebalance(boolean clientRebalance) {
+ this.clientRebalance = clientRebalance;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PopCallback.java
similarity index 50%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/PopCallback.java
index 5078c97..4932e74 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PopCallback.java
@@ -14,31 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.impl.consumer;
+package org.apache.rocketmq.client.consumer;
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-
-public interface ConsumeMessageService {
- void start();
-
- void shutdown(long awaitTerminateMillis);
-
- void updateCorePoolSize(int corePoolSize);
-
- void incCorePoolSize();
-
- void decCorePoolSize();
-
- int getCorePoolSize();
-
- ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
+/**
+ * Async message pop interface
+ */
+public interface PopCallback {
+ void onSuccess(final PopResult popResult);
- void submitConsumeRequest(
- final List<MessageExt> msgs,
- final ProcessQueue processQueue,
- final MessageQueue messageQueue,
- final boolean dispathToConsume);
+ void onException(final Throwable e);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PopResult.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PopResult.java
new file mode 100644
index 0000000..6423e90
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PopResult.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class PopResult {
+ private List<MessageExt> msgFoundList;
+ private PopStatus popStatus;
+ private long popTime;
+ private long invisibleTime;
+ private long restNum;
+
+ public PopResult(PopStatus popStatus, List<MessageExt> msgFoundList) {
+ this.popStatus = popStatus;
+ this.msgFoundList = msgFoundList;
+ }
+
+ public long getPopTime() {
+ return popTime;
+ }
+
+
+ public void setPopTime(long popTime) {
+ this.popTime = popTime;
+ }
+
+ public long getRestNum() {
+ return restNum;
+ }
+
+ public void setRestNum(long restNum) {
+ this.restNum = restNum;
+ }
+
+ public long getInvisibleTime() {
+ return invisibleTime;
+ }
+
+
+ public void setInvisibleTime(long invisibleTime) {
+ this.invisibleTime = invisibleTime;
+ }
+
+
+ public void setPopStatus(PopStatus popStatus) {
+ this.popStatus = popStatus;
+ }
+
+ public PopStatus getPopStatus() {
+ return popStatus;
+ }
+
+ public List<MessageExt> getMsgFoundList() {
+ return msgFoundList;
+ }
+
+ public void setMsgFoundList(List<MessageExt> msgFoundList) {
+ this.msgFoundList = msgFoundList;
+ }
+
+ @Override
+ public String toString() {
+ return "PopResult [popStatus=" + popStatus + ",msgFoundList="
+ + (msgFoundList == null ? 0 : msgFoundList.size()) + ",restNum=" + restNum + "]";
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PopStatus.java
similarity index 50%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/PopStatus.java
index 5078c97..17dda9a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PopStatus.java
@@ -14,31 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.impl.consumer;
-
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-
-public interface ConsumeMessageService {
- void start();
-
- void shutdown(long awaitTerminateMillis);
-
- void updateCorePoolSize(int corePoolSize);
-
- void incCorePoolSize();
-
- void decCorePoolSize();
-
- int getCorePoolSize();
-
- ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
-
- void submitConsumeRequest(
- final List<MessageExt> msgs,
- final ProcessQueue processQueue,
- final MessageQueue messageQueue,
- final boolean dispathToConsume);
+package org.apache.rocketmq.client.consumer;
+
+public enum PopStatus {
+ /**
+ * Founded
+ */
+ FOUND,
+ /**
+ * No new message can be pull after polling time out
+ * delete after next realease
+ */
+ NO_NEW_MSG,
+ /**
+ * polling pool is full, do not try again immediately.
+ */
+ POLLING_FULL,
+ /**
+ * polling time out but no message find
+ */
+ POLLING_NOT_FOUND
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java
similarity index 50%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java
index 5078c97..baf6f17 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/BaseInvokeCallback.java
@@ -14,31 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.impl.consumer;
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+package org.apache.rocketmq.client.impl;
-public interface ConsumeMessageService {
- void start();
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
- void shutdown(long awaitTerminateMillis);
+public abstract class BaseInvokeCallback implements InvokeCallback {
+ private final MQClientAPIImpl mqClientAPI;
- void updateCorePoolSize(int corePoolSize);
+ public BaseInvokeCallback(MQClientAPIImpl mqClientAPI) {
+ this.mqClientAPI = mqClientAPI;
+ }
- void incCorePoolSize();
+ @Override
+ public void operationComplete(final ResponseFuture responseFuture) {
+ onComplete(responseFuture);
+ }
- void decCorePoolSize();
-
- int getCorePoolSize();
-
- ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
-
- void submitConsumeRequest(
- final List<MessageExt> msgs,
- final ProcessQueue processQueue,
- final MessageQueue messageQueue,
- final boolean dispathToConsume);
+ public abstract void onComplete(final ResponseFuture responseFuture);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 7a4d556..f3b4284 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.impl;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -29,6 +30,12 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.AckStatus;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
@@ -59,6 +66,8 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -77,15 +86,21 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryAssignmentRequestBody;
+import org.apache.rocketmq.common.protocol.body.QueryAssignmentResponseBody;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.common.protocol.body.SetMessageRequestModeRequestBody;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeResponseHeader;
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -95,6 +110,7 @@ import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeade
import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
@@ -113,6 +129,8 @@ import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumeQueueRequestHeader;
@@ -144,10 +162,12 @@ import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHead
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
@@ -243,6 +263,34 @@ public class MQClientAPIImpl {
this.remotingClient.shutdown();
}
+ public Set<MessageQueueAssignment> queryAssignment(final String addr, final String topic,
+ final String consumerGroup, final String clientId, final String strategyName,
+ final MessageModel messageModel, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException {
+ QueryAssignmentRequestBody requestBody = new QueryAssignmentRequestBody();
+ requestBody.setTopic(topic);
+ requestBody.setConsumerGroup(consumerGroup);
+ requestBody.setClientId(clientId);
+ requestBody.setMessageModel(messageModel);
+ requestBody.setStrategyName(strategyName);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_ASSIGNMENT, null);
+ request.setBody(requestBody.encode());
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ QueryAssignmentResponseBody queryAssignmentResponseBody = QueryAssignmentResponseBody.decode(response.getBody(), QueryAssignmentResponseBody.class);
+ return queryAssignmentResponseBody.getMessageQueueAssignments();
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
@@ -394,7 +442,8 @@ public class MQClientAPIImpl {
}
- public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
+ public AclConfig getBrokerClusterConfig(final String addr,
+ final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG, null);
@@ -404,7 +453,7 @@ public class MQClientAPIImpl {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
GetBrokerClusterAclConfigResponseBody body =
- GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
+ GetBrokerClusterAclConfigResponseBody.decode(response.getBody(), GetBrokerClusterAclConfigResponseBody.class);
AclConfig aclConfig = new AclConfig();
aclConfig.setGlobalWhiteAddrs(body.getGlobalWhiteAddrs());
aclConfig.setPlainAccessConfigs(body.getPlainAccessConfigs());
@@ -502,7 +551,7 @@ public class MQClientAPIImpl {
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
- return this.processSendResponse(brokerName, msg, response,addr);
+ return this.processSendResponse(brokerName, msg, response, addr);
}
private void sendMessageAsync(
@@ -668,7 +717,7 @@ public class MQClientAPIImpl {
}
SendMessageResponseHeader responseHeader =
- (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+ (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
@@ -687,8 +736,8 @@ public class MQClientAPIImpl {
uniqMsgId = sb.toString();
}
SendResult sendResult = new SendResult(sendStatus,
- uniqMsgId,
- responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
+ uniqMsgId,
+ responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
@@ -730,6 +779,123 @@ public class MQClientAPIImpl {
return null;
}
+ public void popMessageAsync(
+ final String brokerName, final String addr, final PopMessageRequestHeader requestHeader,
+ final long timeoutMillis, final PopCallback popCallback
+ ) throws RemotingException, InterruptedException {
+ final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader);
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new BaseInvokeCallback(MQClientAPIImpl.this) {
+ @Override
+ public void onComplete(ResponseFuture responseFuture) {
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (response != null) {
+ try {
+ PopResult
+ popResult = MQClientAPIImpl.this.processPopResponse(brokerName, response, requestHeader.getTopic(), requestHeader);
+ assert popResult != null;
+ popCallback.onSuccess(popResult);
+ } catch (Exception e) {
+ popCallback.onException(e);
+ }
+ } else {
+ if (!responseFuture.isSendRequestOK()) {
+ popCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
+ } else if (responseFuture.isTimeout()) {
+ popCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
+ responseFuture.getCause()));
+ } else {
+ popCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
+ }
+ }
+ }
+ });
+ }
+
+ public void ackMessageAsync(
+ final String addr,
+ final long timeOut,
+ final AckCallback ackCallback,
+ final AckMessageRequestHeader requestHeader //
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) {
+
+ @Override
+ public void onComplete(ResponseFuture responseFuture) {
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (response != null) {
+ try {
+ AckResult ackResult = new AckResult();
+ if (ResponseCode.SUCCESS == response.getCode()) {
+ ackResult.setStatus(AckStatus.OK);
+ } else {
+ ackResult.setStatus(AckStatus.NO_EXIST);
+ }
+ assert ackResult != null;
+ ackCallback.onSuccess(ackResult);
+ } catch (Exception e) {
+ ackCallback.onException(e);
+ }
+ } else {
+ if (!responseFuture.isSendRequestOK()) {
+ ackCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
+ } else if (responseFuture.isTimeout()) {
+ ackCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
+ responseFuture.getCause()));
+ } else {
+ ackCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeOut + ". Request: " + request, responseFuture.getCause()));
+ }
+ }
+
+ }
+ });
+ }
+
+ public void changeInvisibleTimeAsync(//
+ final String brokerName,
+ final String addr, //
+ final ChangeInvisibleTimeRequestHeader requestHeader,//
+ final long timeoutMillis,
+ final AckCallback ackCallback
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader);
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new BaseInvokeCallback(MQClientAPIImpl.this) {
+ @Override
+ public void onComplete(ResponseFuture responseFuture) {
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (response != null) {
+ try {
+ ChangeInvisibleTimeResponseHeader responseHeader = (ChangeInvisibleTimeResponseHeader) response.decodeCommandCustomHeader(ChangeInvisibleTimeResponseHeader.class);
+ AckResult ackResult = new AckResult();
+ if (ResponseCode.SUCCESS == response.getCode()) {
+ ackResult.setStatus(AckStatus.OK);
+ ackResult.setPopTime(responseHeader.getPopTime());
+ ackResult.setExtraInfo(ExtraInfoUtil
+ .buildExtraInfo(requestHeader.getOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
+ responseHeader.getReviveQid(), requestHeader.getTopic(), brokerName, requestHeader.getQueueId()) + MessageConst.KEY_SEPARATOR
+ + requestHeader.getOffset());
+ } else {
+ ackResult.setStatus(AckStatus.NO_EXIST);
+ }
+ assert ackResult != null;
+ ackCallback.onSuccess(ackResult);
+ } catch (Exception e) {
+ ackCallback.onException(e);
+ }
+ } else {
+ if (!responseFuture.isSendRequestOK()) {
+ ackCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
+ } else if (responseFuture.isTimeout()) {
+ ackCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
+ responseFuture.getCause()));
+ } else {
+ ackCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
+ }
+ }
+ }
+ });
+ }
+
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
@@ -801,6 +967,94 @@ public class MQClientAPIImpl {
responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}
+ private PopResult processPopResponse(final String brokerName, final RemotingCommand response, String topic,
+ CommandCustomHeader requestHeader) throws MQBrokerException, RemotingCommandException {
+ PopStatus popStatus = PopStatus.NO_NEW_MSG;
+ List<MessageExt> msgFoundList = null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS:
+ popStatus = PopStatus.FOUND;
+ ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
+ msgFoundList = MessageDecoder.decodes(byteBuffer);
+ break;
+ case ResponseCode.POLLING_FULL:
+ popStatus = PopStatus.POLLING_FULL;
+ break;
+ case ResponseCode.POLLING_TIMEOUT:
+ popStatus = PopStatus.POLLING_NOT_FOUND;
+ break;
+ case ResponseCode.PULL_NOT_FOUND:
+ popStatus = PopStatus.POLLING_NOT_FOUND;
+ break;
+ default:
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ PopResult popResult = new PopResult(popStatus, msgFoundList);
+ PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.decodeCommandCustomHeader(PopMessageResponseHeader.class);
+ popResult.setRestNum(responseHeader.getRestNum());
+ // it is a pop command if pop time greater than 0, we should set the check point info to extraInfo field
+ if (popStatus == PopStatus.FOUND) {
+ Map<String, Long> startOffsetInfo = null;
+ Map<String, List<Long>> msgOffsetInfo = null;
+ Map<String, Integer> orderCountInfo = null;
+ if (requestHeader instanceof PopMessageRequestHeader) {
+ popResult.setInvisibleTime(responseHeader.getInvisibleTime());
+ popResult.setPopTime(responseHeader.getPopTime());
+ startOffsetInfo = ExtraInfoUtil.parseStartOffsetInfo(responseHeader.getStartOffsetInfo());
+ msgOffsetInfo = ExtraInfoUtil.parseMsgOffsetInfo(responseHeader.getMsgOffsetInfo());
+ orderCountInfo = ExtraInfoUtil.parseOrderCountInfo(responseHeader.getOrderCountInfo());
+ }
+ Map<String/*topicMark@queueId*/, List<Long>/*msg queueOffset*/> sortMap = new HashMap<String, List<Long>>(16);
+ for (MessageExt messageExt : msgFoundList) {
+ String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
+ if (!sortMap.containsKey(key)) {
+ sortMap.put(key, new ArrayList<Long>(4));
+ }
+ sortMap.get(key).add(messageExt.getQueueOffset());
+ }
+ Map<String, String> map = new HashMap<String, String>(5);
+ for (MessageExt messageExt : msgFoundList) {
+ if (requestHeader instanceof PopMessageRequestHeader) {
+ if (startOffsetInfo == null) {
+ // we should set the check point info to extraInfo field , if the command is popMsg
+ // find pop ck offset
+ String key = messageExt.getTopic() + messageExt.getQueueId();
+ if (!map.containsKey(messageExt.getTopic() + messageExt.getQueueId())) {
+ map.put(key, ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(),
+ messageExt.getTopic(), brokerName, messageExt.getQueueId()));
+
+ }
+ messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
+ } else {
+ String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
+ int index = sortMap.get(key).indexOf(messageExt.getQueueOffset());
+ Long msgQueueOffset = msgOffsetInfo.get(key).get(index);
+ if (msgQueueOffset != messageExt.getQueueOffset()) {
+ log.warn("Queue offset[%d] of msg is strange, not equal to the stored in msg, %s", msgQueueOffset, messageExt);
+ }
+
+ messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
+ ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(key).longValue(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
+ responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset.longValue())
+ );
+ if (((PopMessageRequestHeader) requestHeader).isOrder() && orderCountInfo != null) {
+ Integer count = orderCountInfo.get(key);
+ if (count != null && count > 0) {
+ messageExt.setReconsumeTimes(count);
+ }
+ }
+ }
+ if (messageExt.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
+ messageExt.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(responseHeader.getPopTime()));
+ }
+ }
+ messageExt.setTopic(NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace()));
+ }
+ }
+ return popResult;
+ }
+
public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
@@ -2262,4 +2516,24 @@ public class MQClientAPIImpl {
return false;
}
}
+
+ public void setMessageRequestMode(final String brokerAddr, final String topic, final String consumerGroup,
+ final MessageRequestMode mode, final int popShareQueueNum, final long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQClientException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SET_MESSAGE_REQUEST_MODE, null);
+
+ SetMessageRequestModeRequestBody requestBody = new SetMessageRequestModeRequestBody();
+ requestBody.setTopic(topic);
+ requestBody.setConsumerGroup(consumerGroup);
+ requestBody.setMode(mode);
+ requestBody.setPopShareQueueNum(popShareQueueNum);
+ request.setBody(requestBody.encode());
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+ assert response != null;
+ if (ResponseCode.SUCCESS != response.getCode()) {
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index b37f8a6..35102b4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -29,7 +29,6 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -237,6 +236,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
}
+ @Override
+ public void submitPopConsumeRequest(final List<MessageExt> msgs,
+ final PopProcessQueue processQueue,
+ final MessageQueue messageQueue) {
+ throw new UnsupportedOperationException();
+ }
private void cleanExpireMsg() {
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
@@ -386,6 +391,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
+ defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 130effa..ecb3017 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -26,7 +26,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
@@ -39,17 +38,17 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.common.utils.ThreadUtils;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
@@ -205,6 +204,13 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}
}
+ @Override
+ public void submitPopConsumeRequest(final List<MessageExt> msgs,
+ final PopProcessQueue processQueue,
+ final MessageQueue messageQueue) {
+ throw new UnsupportedOperationException();
+ }
+
public synchronized void lockMQPeriodically() {
if (!this.stopped) {
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
similarity index 64%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
copy to client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
index b37f8a6..910f592 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
@@ -19,9 +19,7 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -29,7 +27,8 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -41,15 +40,17 @@ import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
+public class ConsumeMessagePopConcurrentlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final DefaultMQPushConsumer defaultMQPushConsumer;
@@ -59,9 +60,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
private final String consumerGroup;
private final ScheduledExecutorService scheduledExecutorService;
- private final ScheduledExecutorService cleanExpireMsgExecutors;
- public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
+ public ConsumeMessagePopConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
@@ -79,24 +79,14 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
- this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
public void start() {
- this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- cleanExpireMsg();
- }
-
- }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
public void shutdown(long awaitTerminateMillis) {
this.scheduledExecutorService.shutdown();
ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
- this.cleanExpireMsgExecutors.shutdown();
}
@Override
@@ -110,32 +100,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
@Override
public void incCorePoolSize() {
- // long corePoolSize = this.consumeExecutor.getCorePoolSize();
- // if (corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax())
- // {
- // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
- // + 1);
- // }
- // log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup:
- // {}",
- // corePoolSize,
- // this.consumeExecutor.getCorePoolSize(),
- // this.consumerGroup);
}
@Override
public void decCorePoolSize() {
- // long corePoolSize = this.consumeExecutor.getCorePoolSize();
- // if (corePoolSize > this.defaultMQPushConsumer.getConsumeThreadMin())
- // {
- // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
- // - 1);
- // }
- // log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup:
- // {}",
- // corePoolSize,
- // this.consumeExecutor.getCorePoolSize(),
- // this.consumerGroup);
}
@Override
@@ -143,6 +111,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
return this.consumeExecutor.getCorePoolSize();
}
+
@Override
public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
@@ -186,7 +155,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
- ConsumeMessageConcurrentlyService.this.consumerGroup,
+ ConsumeMessagePopConcurrentlyService.this.consumerGroup,
msgs,
mq), e);
}
@@ -199,11 +168,16 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
@Override
- public void submitConsumeRequest(
+ public void submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue,
+ MessageQueue messageQueue, boolean dispathToConsume) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void submitPopConsumeRequest(
final List<MessageExt> msgs,
- final ProcessQueue processQueue,
- final MessageQueue messageQueue,
- final boolean dispatchToConsume) {
+ final PopProcessQueue processQueue,
+ final MessageQueue messageQueue) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
@@ -237,26 +211,17 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
}
-
- private void cleanExpireMsg() {
- Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
- this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<MessageQueue, ProcessQueue> next = it.next();
- ProcessQueue pq = next.getValue();
- pq.cleanExpiredMsg(this.defaultMQPushConsumer);
- }
- }
-
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
- final ConsumeRequest consumeRequest
- ) {
- int ackIndex = context.getAckIndex();
+ final ConsumeRequest consumeRequest) {
- if (consumeRequest.getMsgs().isEmpty())
+ if (consumeRequest.getMsgs().isEmpty()) {
return;
+ }
+
+ int ackIndex = context.getAckIndex();
+ String topic = consumeRequest.getMessageQueue().getTopic();
switch (status) {
case CONSUME_SUCCESS:
@@ -265,74 +230,94 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
- this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
+ this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, topic, ok);
+ this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, topic, failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
- consumeRequest.getMsgs().size());
+ this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, topic,
+ consumeRequest.getMsgs().size());
break;
default:
break;
}
- switch (this.defaultMQPushConsumer.getMessageModel()) {
- case BROADCASTING:
- for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- MessageExt msg = consumeRequest.getMsgs().get(i);
- log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
- }
- break;
- case CLUSTERING:
- List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
- for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- MessageExt msg = consumeRequest.getMsgs().get(i);
- boolean result = this.sendMessageBack(msg, context);
- if (!result) {
- msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
- msgBackFailed.add(msg);
- }
- }
+ //ack if consume success
+ for (int i = 0; i <= ackIndex; i++) {
+ this.defaultMQPushConsumerImpl.ackAsync(consumeRequest.getMsgs().get(i), consumerGroup);
+ consumeRequest.getPopProcessQueue().ack();
+ }
- if (!msgBackFailed.isEmpty()) {
- consumeRequest.getMsgs().removeAll(msgBackFailed);
+ //consume later if consume fail
+ for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
+ MessageExt msgExt = consumeRequest.getMsgs().get(i);
+ consumeRequest.getPopProcessQueue().ack();
+ if (msgExt.getReconsumeTimes() >= this.defaultMQPushConsumerImpl.getMaxReconsumeTimes()) {
+ checkNeedAckOrDelay(msgExt);
+ continue;
+ }
- this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
- }
- break;
- default:
- break;
+ int delayLevel = context.getDelayLevelWhenNextConsume();
+ changePopInvisibleTime(consumeRequest.getMsgs().get(i), consumerGroup, delayLevel);
}
+ }
+
+ private void checkNeedAckOrDelay(MessageExt msgExt) {
+ int[] delayLevelTable = this.defaultMQPushConsumerImpl.getPopDelayLevel();
+
+ long msgDelaytime = System.currentTimeMillis() - msgExt.getBornTimestamp();
+ if (msgDelaytime > delayLevelTable[delayLevelTable.length - 1] * 1000 * 2) {
+ log.warn("Consume too many times, ack message async. message {}", msgExt.toString());
+ this.defaultMQPushConsumerImpl.ackAsync(msgExt, consumerGroup);
+ } else {
+ int delayLevel = delayLevelTable.length - 1;
+ for (; delayLevel >= 0; delayLevel--) {
+ if (msgDelaytime >= delayLevelTable[delayLevel] * 1000) {
+ delayLevel++;
+ break;
+ }
+ }
- long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
- if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
- this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
+ changePopInvisibleTime(msgExt, consumerGroup, delayLevel);
+ log.warn("Consume too many times, but delay time {} not enough. changePopInvisibleTime to delayLevel {} . message key:{}",
+ msgDelaytime, delayLevel, msgExt.getKeys());
}
}
- public ConsumerStatsManager getConsumerStatsManager() {
- return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
- }
+ private void changePopInvisibleTime(final MessageExt msg, String consumerGroup, int delayLevel) {
+ if (0 == delayLevel) {
+ delayLevel = 3 + msg.getReconsumeTimes();
+ }
- public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
- int delayLevel = context.getDelayLevelWhenNextConsume();
+ int[] delayLevelTable = this.defaultMQPushConsumerImpl.getPopDelayLevel();
+ int delaySecond = delayLevel >= delayLevelTable.length ? delayLevelTable[delayLevelTable.length - 1] : delayLevelTable[delayLevel];
+ String extraInfo = msg.getProperty(MessageConst.PROPERTY_POP_CK);
- // Wrap topic with namespace before sending back message.
- msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
- this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
- return true;
- } catch (Exception e) {
- log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
+ this.defaultMQPushConsumerImpl.changePopInvisibleTimeAsync(msg.getTopic(), consumerGroup, extraInfo,
+ delaySecond * 1000L, new AckCallback() {
+ @Override
+ public void onSuccess(AckResult ackResult) {
+ }
+
+
+ @Override
+ public void onException(Throwable e) {
+ log.error("changePopInvisibleTimeAsync fail. msg:{} error info: {}", msg.toString(), e.toString());
+ }
+ });
+ } catch (Throwable t) {
+ log.error("changePopInvisibleTimeAsync fail, group:{} msg:{} errorInfo:{}", consumerGroup, msg.toString(), t.toString());
}
+ }
- return false;
+ public ConsumerStatsManager getConsumerStatsManager() {
+ return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
}
private void submitConsumeRequestLater(
final List<MessageExt> msgs,
- final ProcessQueue processQueue,
+ final PopProcessQueue processQueue,
final MessageQueue messageQueue
) {
@@ -340,7 +325,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
@Override
public void run() {
- ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
+ ConsumeMessagePopConcurrentlyService.this.submitPopConsumeRequest(msgs, processQueue, messageQueue);
}
}, 5000, TimeUnit.MILLISECONDS);
}
@@ -352,44 +337,71 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
@Override
public void run() {
- ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
+ ConsumeMessagePopConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
}
}, 5000, TimeUnit.MILLISECONDS);
}
class ConsumeRequest implements Runnable {
private final List<MessageExt> msgs;
- private final ProcessQueue processQueue;
+ private final PopProcessQueue processQueue;
private final MessageQueue messageQueue;
+ private long popTime = 0;
+ private long invisibleTime = 0;
- public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
+ public ConsumeRequest(List<MessageExt> msgs, PopProcessQueue processQueue, MessageQueue messageQueue) {
this.msgs = msgs;
this.processQueue = processQueue;
this.messageQueue = messageQueue;
+
+ try {
+ String extraInfo = msgs.get(0).getProperty(MessageConst.PROPERTY_POP_CK);
+ String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+ popTime = ExtraInfoUtil.getPopTime(extraInfoStrs);
+ invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfoStrs);
+ } catch (Throwable t) {
+ log.error("parse extra info error. msg:" + msgs.get(0), t);
+ }
+ }
+
+ public boolean isPopTimeout() {
+ if (msgs.size() == 0 || popTime <= 0 || invisibleTime <= 0) {
+ return true;
+ }
+
+ long current = System.currentTimeMillis();
+ return current - popTime >= invisibleTime;
}
public List<MessageExt> getMsgs() {
return msgs;
}
- public ProcessQueue getProcessQueue() {
+ public PopProcessQueue getPopProcessQueue() {
return processQueue;
}
@Override
public void run() {
if (this.processQueue.isDropped()) {
- log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
+ log.info("the message queue not be able to consume, because it's dropped(pop). group={} {}", ConsumeMessagePopConcurrentlyService.this.consumerGroup, this.messageQueue);
+ return;
+ }
+
+ if (isPopTimeout()) {
+ log.info("the pop message time out so abort consume. popTime={} invisibleTime={}, group={} {}",
+ popTime, invisibleTime, ConsumeMessagePopConcurrentlyService.this.consumerGroup, this.messageQueue);
+ processQueue.decFoundMsg(-msgs.size());
return;
}
- MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
+ MessageListenerConcurrently listener = ConsumeMessagePopConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
- if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
+ if (ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
@@ -397,7 +409,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
- ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
+ ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
@@ -413,7 +425,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
- ConsumeMessageConcurrentlyService.this.consumerGroup,
+ ConsumeMessagePopConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
@@ -425,7 +437,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
- } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
+ } else if (consumeRT >= invisibleTime * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
@@ -433,31 +445,30 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
returnType = ConsumeReturnType.SUCCESS;
}
- if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
- }
-
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
- ConsumeMessageConcurrentlyService.this.consumerGroup,
+ ConsumeMessagePopConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
- if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext.setStatus(status.toString());
- consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
- ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
+ if (ConsumeMessagePopConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
+ consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
- ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
- .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
+ ConsumeMessagePopConcurrentlyService.this.getConsumerStatsManager()
+ .incConsumeRT(ConsumeMessagePopConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
- if (!processQueue.isDropped()) {
- ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
+ if (!processQueue.isDropped() && !isPopTimeout()) {
+ ConsumeMessagePopConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
- log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
+ if (msgs != null) {
+ processQueue.decFoundMsg(-msgs.size());
+ }
+
+ log.warn("processQueue invalid. isDropped={}, isPopTimeout={}, messageQueue={}, msgs={}",
+ processQueue.isDropped(), isPopTimeout(), messageQueue, msgs);
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java
new file mode 100644
index 0000000..48e0336
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import io.netty.util.internal.ConcurrentSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.body.CMResult;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class ConsumeMessagePopOrderlyService implements ConsumeMessageService {
+ private static final InternalLogger log = ClientLogger.getLog();
+ private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+ private final DefaultMQPushConsumer defaultMQPushConsumer;
+ private final MessageListenerOrderly messageListener;
+ private final BlockingQueue<Runnable> consumeRequestQueue;
+ private final ConcurrentSet<ConsumeRequest> consumeRequestSet = new ConcurrentSet<ConsumeRequest>();
+ private final ThreadPoolExecutor consumeExecutor;
+ private final String consumerGroup;
+ private final MessageQueueLock messageQueueLock = new MessageQueueLock();
+ private final MessageQueueLock consumeRequestLock = new MessageQueueLock();
+ private final ScheduledExecutorService scheduledExecutorService;
+ private volatile boolean stopped = false;
+
+ public ConsumeMessagePopOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
+ MessageListenerOrderly messageListener) {
+ this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
+ this.messageListener = messageListener;
+
+ this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
+ this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
+ this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
+
+ this.consumeExecutor = new ThreadPoolExecutor(
+ this.defaultMQPushConsumer.getConsumeThreadMin(),
+ this.defaultMQPushConsumer.getConsumeThreadMax(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.consumeRequestQueue,
+ new ThreadFactoryImpl("ConsumeMessageThread_"));
+
+ this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
+ }
+
+ @Override
+ public void start() {
+ if (MessageModel.CLUSTERING.equals(ConsumeMessagePopOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ ConsumeMessagePopOrderlyService.this.lockMQPeriodically();
+ }
+ }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public void shutdown(long awaitTerminateMillis) {
+ this.stopped = true;
+ this.scheduledExecutorService.shutdown();
+ ThreadUtils.shutdownGracefully(this.consumeExecutor, awaitTerminateMillis, TimeUnit.MILLISECONDS);
+ if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
+ this.unlockAllMessageQueues();
+ }
+ }
+
+ public synchronized void unlockAllMessageQueues() {
+ this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
+ }
+
+ @Override
+ public void updateCorePoolSize(int corePoolSize) {
+ if (corePoolSize > 0
+ && corePoolSize <= Short.MAX_VALUE
+ && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
+ this.consumeExecutor.setCorePoolSize(corePoolSize);
+ }
+ }
+
+ @Override
+ public void incCorePoolSize() {
+ }
+
+ @Override
+ public void decCorePoolSize() {
+ }
+
+ @Override
+ public int getCorePoolSize() {
+ return this.consumeExecutor.getCorePoolSize();
+ }
+
+ @Override
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
+ ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
+ result.setOrder(true);
+
+ List<MessageExt> msgs = new ArrayList<MessageExt>();
+ msgs.add(msg);
+ MessageQueue mq = new MessageQueue();
+ mq.setBrokerName(brokerName);
+ mq.setTopic(msg.getTopic());
+ mq.setQueueId(msg.getQueueId());
+
+ ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq);
+
+ this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, this.consumerGroup);
+
+ final long beginTime = System.currentTimeMillis();
+
+ log.info("consumeMessageDirectly receive new message: {}", msg);
+
+ try {
+ ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context);
+ if (status != null) {
+ switch (status) {
+ case COMMIT:
+ result.setConsumeResult(CMResult.CR_COMMIT);
+ break;
+ case ROLLBACK:
+ result.setConsumeResult(CMResult.CR_ROLLBACK);
+ break;
+ case SUCCESS:
+ result.setConsumeResult(CMResult.CR_SUCCESS);
+ break;
+ case SUSPEND_CURRENT_QUEUE_A_MOMENT:
+ result.setConsumeResult(CMResult.CR_LATER);
+ break;
+ default:
+ break;
+ }
+ } else {
+ result.setConsumeResult(CMResult.CR_RETURN_NULL);
+ }
+ } catch (Throwable e) {
+ result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
+ result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
+
+ log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
+ RemotingHelper.exceptionSimpleDesc(e),
+ ConsumeMessagePopOrderlyService.this.consumerGroup,
+ msgs,
+ mq), e);
+ }
+
+ result.setAutoCommit(context.isAutoCommit());
+ result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
+
+ log.info("consumeMessageDirectly Result: {}", result);
+
+ return result;
+ }
+
+ @Override
+ public void submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue,
+ MessageQueue messageQueue, boolean dispathToConsume) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void submitPopConsumeRequest(final List<MessageExt> msgs,
+ final PopProcessQueue processQueue,
+ final MessageQueue messageQueue) {
+ ConsumeRequest req = new ConsumeRequest(processQueue, messageQueue);
+ submitConsumeRequest(req, false);
+ }
+
+ public synchronized void lockMQPeriodically() {
+ if (!this.stopped) {
+ this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
+ }
+ }
+
+ private void removeConsumeRequest(final ConsumeRequest consumeRequest) {
+ consumeRequestSet.remove(consumeRequest);
+ }
+
+ private void submitConsumeRequest(final ConsumeRequest consumeRequest, boolean force) {
+ Object lock = consumeRequestLock.fetchLockObject(consumeRequest.getMessageQueue(), consumeRequest.shardingKeyIndex);
+ synchronized (lock) {
+ boolean isNewReq = consumeRequestSet.add(consumeRequest);
+ if (force || isNewReq) {
+ try {
+ consumeExecutor.submit(consumeRequest);
+ } catch (Exception e) {
+ log.error("error submit consume request: {}, mq: {}, shardingKeyIndex: {}",
+ e.toString(), consumeRequest.getMessageQueue(), consumeRequest.getShardingKeyIndex());
+ }
+ }
+ }
+ }
+
+ private void submitConsumeRequestLater(final ConsumeRequest consumeRequest, final long suspendTimeMillis) {
+ long timeMillis = suspendTimeMillis;
+ if (timeMillis == -1) {
+ timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
+ }
+
+ if (timeMillis < 10) {
+ timeMillis = 10;
+ } else if (timeMillis > 30000) {
+ timeMillis = 30000;
+ }
+
+ this.scheduledExecutorService.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ submitConsumeRequest(consumeRequest, true);
+ }
+ }, timeMillis, TimeUnit.MILLISECONDS);
+ }
+
+ public boolean processConsumeResult(
+ final List<MessageExt> msgs,
+ final ConsumeOrderlyStatus status,
+ final ConsumeOrderlyContext context,
+ final ConsumeRequest consumeRequest
+ ) {
+ return true;
+ }
+
+ public ConsumerStatsManager getConsumerStatsManager() {
+ return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
+ }
+
+ private int getMaxReconsumeTimes() {
+ // default reconsume times: Integer.MAX_VALUE
+ if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
+ return Integer.MAX_VALUE;
+ } else {
+ return this.defaultMQPushConsumer.getMaxReconsumeTimes();
+ }
+ }
+
+ private boolean checkReconsumeTimes(List<MessageExt> msgs) {
+ boolean suspend = false;
+ if (msgs != null && !msgs.isEmpty()) {
+ for (MessageExt msg : msgs) {
+ if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
+ MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
+ if (!sendMessageBack(msg)) {
+ suspend = true;
+ msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
+ }
+ } else {
+ suspend = true;
+ msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
+ }
+ }
+ }
+ return suspend;
+ }
+
+ public boolean sendMessageBack(final MessageExt msg) {
+ try {
+ // max reconsume times exceeded then send to dead letter queue.
+ Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
+ String originMsgId = MessageAccessor.getOriginMessageId(msg);
+ MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
+ newMsg.setFlag(msg.getFlag());
+ MessageAccessor.setProperties(newMsg, msg.getProperties());
+ MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
+ MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
+ MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
+ newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
+
+ this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
+ return true;
+ } catch (Exception e) {
+ log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
+ }
+
+ return false;
+ }
+
+ public void resetNamespace(final List<MessageExt> msgs) {
+ for (MessageExt msg : msgs) {
+ if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
+ msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
+ }
+ }
+ }
+
+ class ConsumeRequest implements Runnable {
+ private final PopProcessQueue processQueue;
+ private final MessageQueue messageQueue;
+ private int shardingKeyIndex = 0;
+
+ public ConsumeRequest(PopProcessQueue processQueue, MessageQueue messageQueue) {
+ this.processQueue = processQueue;
+ this.messageQueue = messageQueue;
+ this.shardingKeyIndex = 0;
+ }
+
+ public ConsumeRequest(PopProcessQueue processQueue, MessageQueue messageQueue, int shardingKeyIndex) {
+ this.processQueue = processQueue;
+ this.messageQueue = messageQueue;
+ this.shardingKeyIndex = shardingKeyIndex;
+ }
+
+ public PopProcessQueue getProcessQueue() {
+ return processQueue;
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+ public int getShardingKeyIndex() {
+ return shardingKeyIndex;
+ }
+
+ @Override
+ public void run() {
+ if (this.processQueue.isDropped()) {
+ log.warn("run, message queue not be able to consume, because it's dropped. {}", this.messageQueue);
+ ConsumeMessagePopOrderlyService.this.removeConsumeRequest(this);
+ return;
+ }
+
+ // lock on sharding key index
+ final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue, shardingKeyIndex);
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = shardingKeyIndex;
+ if (processQueue != null) {
+ hash += processQueue.hashCode() * 31;
+ }
+ if (messageQueue != null) {
+ hash += messageQueue.hashCode() * 31;
+ }
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ ConsumeRequest other = (ConsumeRequest) obj;
+ if (shardingKeyIndex != other.shardingKeyIndex) {
+ return false;
+ }
+
+ if (processQueue != other.processQueue) {
+ return false;
+ }
+
+ if (messageQueue == other.messageQueue) {
+ return true;
+ }
+ if (messageQueue != null && messageQueue.equals(other.messageQueue)) {
+ return true;
+ }
+ return false;
+ }
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
index 5078c97..bdde6ff 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
@@ -41,4 +41,9 @@ public interface ConsumeMessageService {
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume);
+
+ void submitPopConsumeRequest(
+ final List<MessageExt> msgs,
+ final PopProcessQueue processQueue,
+ final MessageQueue messageQueue);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index f832370..5cf814b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -142,6 +142,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
+ // only for test purpose, will be modified by reflection in unit test.
+ @SuppressWarnings("FieldMayBeFinal") private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
+
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
@@ -417,6 +420,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
+ if (doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged) {
+ return;
+ }
Map<String, SubscriptionData> subTable = rebalanceImpl.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index e08f780..64126b0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -27,12 +27,17 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
@@ -46,39 +51,47 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.hook.FilterMessageContext;
import org.apache.rocketmq.client.hook.FilterMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.PopProcessQueueInfo;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
+
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
/**
* Delay some time when exception occur
@@ -109,9 +122,20 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private MessageListener messageListenerInner;
private OffsetStore offsetStore;
private ConsumeMessageService consumeMessageService;
+ private ConsumeMessageService consumeMessagePopService;
private long queueFlowControlTimes = 0;
private long queueMaxSpanFlowControlTimes = 0;
+ //10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
+ private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
+
+ private static final int MAX_POP_INVISIBLE_TIME = 300000;
+ private static final int MIN_POP_INVISIBLE_TIME = 5000;
+ private static final int ASYNC_TIMEOUT = 3000;
+
+ // only for test purpose, will be modified by reflection in unit test.
+ @SuppressWarnings("FieldMayBeFinal") private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
+
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
@@ -443,6 +467,169 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
}
+ void popMessage(final PopRequest popRequest) {
+ final PopProcessQueue processQueue = popRequest.getPopProcessQueue();
+ if (processQueue.isDropped()) {
+ log.info("the pop request[{}] is dropped.", popRequest.toString());
+ return;
+ }
+
+ processQueue.setLastPopTimestamp(System.currentTimeMillis());
+
+ try {
+ this.makeSureStateOK();
+ } catch (MQClientException e) {
+ log.warn("pullMessage exception, consumer state not ok", e);
+ this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+ return;
+ }
+
+ if (this.isPause()) {
+ log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
+ this.executePopPullRequestLater(popRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
+ return;
+ }
+
+ if (processQueue.getWaiAckMsgCount() > this.defaultMQPushConsumer.getPopThresholdForQueue()) {
+ this.executePopPullRequestLater(popRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+ if ((queueFlowControlTimes++ % 1000) == 0) {
+ log.warn("the messages waiting to ack exceeds the threshold {}, so do flow control, popRequest={}, flowControlTimes={}, wait count={}",
+ this.defaultMQPushConsumer.getPopThresholdForQueue(), popRequest, queueFlowControlTimes, processQueue.getWaiAckMsgCount());
+ }
+ return;
+ }
+
+ //POPTODO think of pop mode orderly implementation later.
+ final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(popRequest.getMessageQueue().getTopic());
+ if (null == subscriptionData) {
+ this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+ log.warn("find the consumer's subscription failed, {}", popRequest);
+ return;
+ }
+
+ final long beginTimestamp = System.currentTimeMillis();
+
+ PopCallback popCallback = new PopCallback() {
+ @Override
+ public void onSuccess(PopResult popResult) {
+ if (popResult == null) {
+ log.error("pop callback popResult is null");
+ DefaultMQPushConsumerImpl.this.executePopPullRequestImmediately(popRequest);
+ return;
+ }
+
+ processPopResult(popResult, subscriptionData);
+
+ switch (popResult.getPopStatus()) {
+ case FOUND:
+ long pullRT = System.currentTimeMillis() - beginTimestamp;
+ DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(popRequest.getConsumerGroup(),
+ popRequest.getMessageQueue().getTopic(), pullRT);
+ if (popResult.getMsgFoundList() == null || popResult.getMsgFoundList().isEmpty()) {
+ DefaultMQPushConsumerImpl.this.executePopPullRequestImmediately(popRequest);
+ } else {
+ DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(popRequest.getConsumerGroup(),
+ popRequest.getMessageQueue().getTopic(), popResult.getMsgFoundList().size());
+ popRequest.getPopProcessQueue().incFoundMsg(popResult.getMsgFoundList().size());
+
+ DefaultMQPushConsumerImpl.this.consumeMessagePopService.submitPopConsumeRequest(
+ popResult.getMsgFoundList(),
+ processQueue,
+ popRequest.getMessageQueue());
+
+ if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
+ DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest,
+ DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
+ } else {
+ DefaultMQPushConsumerImpl.this.executePopPullRequestImmediately(popRequest);
+ }
+ }
+ break;
+ case NO_NEW_MSG:
+ case POLLING_NOT_FOUND:
+ DefaultMQPushConsumerImpl.this.executePopPullRequestImmediately(popRequest);
+ break;
+ case POLLING_FULL:
+ DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+ break;
+ default:
+ DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+ break;
+ }
+
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ if (!popRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ log.warn("execute the pull request exception: {}", e);
+ }
+
+ DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+ }
+ };
+
+
+ try {
+
+ long invisibleTime = this.defaultMQPushConsumer.getPopInvisibleTime();
+ if (invisibleTime < MIN_POP_INVISIBLE_TIME || invisibleTime > MAX_POP_INVISIBLE_TIME) {
+ invisibleTime = 60000;
+ }
+ this.pullAPIWrapper.popAsync(popRequest.getMessageQueue(), invisibleTime, this.defaultMQPushConsumer.getPopBatchNums(),
+ popRequest.getConsumerGroup(), BROKER_SUSPEND_MAX_TIME_MILLIS, popCallback, true, popRequest.getInitMode(),
+ false, subscriptionData.getExpressionType(), subscriptionData.getSubString());
+ } catch (Exception e) {
+ log.error("popAsync exception", e);
+ this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+ }
+ }
+
+ private PopResult processPopResult(final PopResult popResult, final SubscriptionData subscriptionData) {
+ if (PopStatus.FOUND == popResult.getPopStatus()) {
+ List<MessageExt> msgFoundList = popResult.getMsgFoundList();
+ List<MessageExt> msgListFilterAgain = msgFoundList;
+ if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()
+ && popResult.getMsgFoundList().size() > 0) {
+ msgListFilterAgain = new ArrayList<MessageExt>(popResult.getMsgFoundList().size());
+ for (MessageExt msg : popResult.getMsgFoundList()) {
+ if (msg.getTags() != null) {
+ if (subscriptionData.getTagsSet().contains(msg.getTags())) {
+ msgListFilterAgain.add(msg);
+ }
+ }
+ }
+ }
+
+ if (!this.filterMessageHookList.isEmpty()) {
+ FilterMessageContext filterMessageContext = new FilterMessageContext();
+ filterMessageContext.setUnitMode(this.defaultMQPushConsumer.isUnitMode());
+ filterMessageContext.setMsgList(msgListFilterAgain);
+ if (!this.filterMessageHookList.isEmpty()) {
+ for (FilterMessageHook hook : this.filterMessageHookList) {
+ try {
+ hook.filterMessage(filterMessageContext);
+ } catch (Throwable e) {
+ log.error("execute hook error. hookName={}", hook.hookName());
+ }
+ }
+ }
+ }
+
+ if (msgFoundList.size() != msgListFilterAgain.size()) {
+ for (MessageExt msg : msgFoundList) {
+ if (!msgListFilterAgain.contains(msg)) {
+ ackAsync(msg, this.groupName());
+ }
+ }
+ }
+
+ popResult.setMsgFoundList(msgListFilterAgain);
+ }
+
+ return popResult;
+ }
+
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The consumer service state not OK, "
@@ -452,7 +639,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
}
- private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
+ void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
}
@@ -472,6 +659,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
+ void executePopPullRequestLater(final PopRequest pullRequest, final long timeDelay) {
+ this.mQClientFactory.getPullMessageService().executePopPullRequestLater(pullRequest, timeDelay);
+ }
+
+ void executePopPullRequestImmediately(final PopRequest pullRequest) {
+ this.mQClientFactory.getPullMessageService().executePopPullRequestImmediately(pullRequest);
+ }
+
private void correctTagsOffset(final PullRequest pullRequest) {
if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
@@ -531,7 +726,78 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
}
- private int getMaxReconsumeTimes() {
+ void ackAsync(MessageExt message, String consumerGroup) {
+ final String extraInfo = message.getProperty(MessageConst.PROPERTY_POP_CK);
+
+ try {
+ String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+ String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs);
+ int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
+ long queueOffset = ExtraInfoUtil.getQueueOffset(extraInfoStrs);
+ String topic = message.getTopic();
+
+ FindBrokerResult
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+ if (null == findBrokerResult) {
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+ }
+
+ if (findBrokerResult == null) {
+ log.error("The broker[" + brokerName + "] not exist");
+ return;
+ }
+
+ AckMessageRequestHeader requestHeader = new AckMessageRequestHeader();
+ requestHeader.setTopic(ExtraInfoUtil.getRealTopic(extraInfoStrs, topic, consumerGroup));
+ requestHeader.setQueueId(queueId);
+ requestHeader.setOffset(queueOffset);
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setExtraInfo(extraInfo);
+ this.mQClientFactory.getMQClientAPIImpl().ackMessageAsync(findBrokerResult.getBrokerAddr(), ASYNC_TIMEOUT, new AckCallback() {
+ @Override
+ public void onSuccess(AckResult ackResult) {
+ if (ackResult != null && !AckStatus.OK.equals(ackResult.getStatus())) {
+ log.info("Ack message fail. ackResult: {}, extraInfo: {}", ackResult, extraInfo);
+ }
+ }
+ @Override
+ public void onException(Throwable e) {
+ log.info("Ack message fail. extraInfo: {} error message: {}", extraInfo, e.toString());
+ }
+ }, requestHeader);
+
+ } catch (Throwable t) {
+ log.error("ack async error.", t);
+ }
+ }
+
+ void changePopInvisibleTimeAsync(String topic, String consumerGroup, String extraInfo, long invisibleTime, AckCallback callback)
+ throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+ String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs);
+ int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
+ FindBrokerResult
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+ if (null == findBrokerResult) {
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+ }
+ if (findBrokerResult != null) {
+ ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader();
+ requestHeader.setTopic(ExtraInfoUtil.getRealTopic(extraInfoStrs, topic, consumerGroup));
+ requestHeader.setQueueId(queueId);
+ requestHeader.setOffset(ExtraInfoUtil.getQueueOffset(extraInfoStrs));
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setExtraInfo(extraInfo);
+ requestHeader.setInvisibleTime(invisibleTime);
+ this.mQClientFactory.getMQClientAPIImpl().changeInvisibleTimeAsync(brokerName, findBrokerResult.getBrokerAddr(), requestHeader, ASYNC_TIMEOUT, callback);
+ return;
+ }
+ throw new MQClientException("The broker[" + brokerName + "] not exist", null);
+ }
+
+ public int getMaxReconsumeTimes() {
// default reconsume times: 16
if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
return 16;
@@ -612,13 +878,20 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
+ //POPTODO reuse Executor ?
+ this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
+ //POPTODO reuse Executor ?
+ this.consumeMessagePopService =
+ new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
+ // POPTODO
+ this.consumeMessagePopService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
@@ -818,6 +1091,23 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
+
+ // popInvisibleTime
+ if (this.defaultMQPushConsumer.getPopInvisibleTime() < MIN_POP_INVISIBLE_TIME
+ || this.defaultMQPushConsumer.getPopInvisibleTime() > MAX_POP_INVISIBLE_TIME) {
+ throw new MQClientException(
+ "popInvisibleTime Out of range [" + MIN_POP_INVISIBLE_TIME + ", " + MAX_POP_INVISIBLE_TIME + "]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // popBatchNums
+ if (this.defaultMQPushConsumer.getPopBatchNums() <= 0 || this.defaultMQPushConsumer.getPopBatchNums() > 32) {
+ throw new MQClientException(
+ "popBatchNums Out of range [1, 32]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
}
private void copySubscription() throws MQClientException {
@@ -857,6 +1147,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
+ if (doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged) {
+ return;
+ }
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
@@ -1074,6 +1367,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
info.getMqTable().put(mq, pqinfo);
}
+ Iterator<Entry<MessageQueue, PopProcessQueue>> popIt = this.rebalanceImpl.getPopProcessQueueTable().entrySet().iterator();
+ while (popIt.hasNext()) {
+ Entry<MessageQueue, PopProcessQueue> next = popIt.next();
+ MessageQueue mq = next.getKey();
+ PopProcessQueue pq = next.getValue();
+
+ PopProcessQueueInfo pqinfo = new PopProcessQueueInfo();
+ pq.fillPopProcessQueueInfo(pqinfo);
+ info.getMqPopTable().put(mq, pqinfo);
+ }
+
for (SubscriptionData sd : subSet) {
ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic());
info.getStatusTable().put(sd.getTopic(), consumeStatus);
@@ -1142,6 +1446,19 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
return queueTimeSpan;
}
+ public void tryResetPopRetryTopic(final List<MessageExt> msgs, String consumerGroup) {
+ String popRetryPrefix = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup + "_";
+ for (MessageExt msg : msgs) {
+ if (msg.getTopic().startsWith(popRetryPrefix)) {
+ String normalTopic = KeyBuilder.parseNormalTopic(msg.getTopic(), consumerGroup);
+ if (normalTopic != null && !normalTopic.isEmpty()) {
+ msg.setTopic(normalTopic);
+ }
+ }
+ }
+ }
+
+
public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
@@ -1168,4 +1485,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
}
+
+ int[] getPopDelayLevel() {
+ return popDelayLevel;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
index a02f1b6..73453b0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
@@ -24,19 +24,32 @@ import org.apache.rocketmq.common.message.MessageQueue;
* Message lock,strictly ensure the single queue only one thread at a time consuming
*/
public class MessageQueueLock {
- private ConcurrentMap<MessageQueue, Object> mqLockTable =
- new ConcurrentHashMap<MessageQueue, Object>();
+ private ConcurrentMap<MessageQueue, ConcurrentMap<Integer, Object>> mqLockTable =
+ new ConcurrentHashMap<MessageQueue, ConcurrentMap<Integer, Object>>(32);
public Object fetchLockObject(final MessageQueue mq) {
- Object objLock = this.mqLockTable.get(mq);
- if (null == objLock) {
- objLock = new Object();
- Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
+ return fetchLockObject(mq, -1);
+ }
+
+ public Object fetchLockObject(final MessageQueue mq, final int shardingKeyIndex) {
+ ConcurrentMap<Integer, Object> objMap = this.mqLockTable.get(mq);
+ if (null == objMap) {
+ objMap = new ConcurrentHashMap<Integer, Object>(32);
+ ConcurrentMap<Integer, Object> prevObjMap = this.mqLockTable.putIfAbsent(mq, objMap);
+ if (prevObjMap != null) {
+ objMap = prevObjMap;
+ }
+ }
+
+ Object lock = objMap.get(shardingKeyIndex);
+ if (null == lock) {
+ lock = new Object();
+ Object prevLock = objMap.putIfAbsent(shardingKeyIndex, lock);
if (prevLock != null) {
- objLock = prevLock;
+ lock = prevLock;
}
}
- return objLock;
+ return lock;
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageRequest.java
similarity index 53%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
copy to client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageRequest.java
index 5078c97..a808538 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageRequest.java
@@ -16,29 +16,8 @@
*/
package org.apache.rocketmq.client.impl.consumer;
-import java.util.List;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.message.MessageRequestMode;
-public interface ConsumeMessageService {
- void start();
-
- void shutdown(long awaitTerminateMillis);
-
- void updateCorePoolSize(int corePoolSize);
-
- void incCorePoolSize();
-
- void decCorePoolSize();
-
- int getCorePoolSize();
-
- ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
-
- void submitConsumeRequest(
- final List<MessageExt> msgs,
- final ProcessQueue processQueue,
- final MessageQueue messageQueue,
- final boolean dispathToConsume);
+public interface MessageRequest {
+ MessageRequestMode getMessageRequestMode();
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
new file mode 100644
index 0000000..0883a77
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopProcessQueue.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.protocol.body.PopProcessQueueInfo;
+
+/**
+ * Queue consumption snapshot
+ */
+public class PopProcessQueue {
+
+ private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
+
+ private long lastPopTimestamp;
+ private AtomicInteger waitAckCounter = new AtomicInteger(0);
+ private volatile boolean dropped = false;
+
+ public long getLastPopTimestamp() {
+ return lastPopTimestamp;
+ }
+
+ public void setLastPopTimestamp(long lastPopTimestamp) {
+ this.lastPopTimestamp = lastPopTimestamp;
+ }
+
+ public void incFoundMsg(int count) {
+ this.waitAckCounter.getAndAdd(count);
+ }
+
+ /**
+ * @return the value before decrement.
+ */
+ public int ack() {
+ return this.waitAckCounter.getAndDecrement();
+ }
+
+ public void decFoundMsg(int count) {
+ this.waitAckCounter.addAndGet(count);
+ }
+
+ public int getWaiAckMsgCount() {
+ return this.waitAckCounter.get();
+ }
+
+ public boolean isDropped() {
+ return dropped;
+ }
+
+ public void setDropped(boolean dropped) {
+ this.dropped = dropped;
+ }
+
+ public void fillPopProcessQueueInfo(final PopProcessQueueInfo info) {
+ info.setWaitAckCount(getWaiAckMsgCount());
+ info.setDroped(isDropped());
+ info.setLastPopTimestamp(getLastPopTimestamp());
+ }
+
+ public boolean isPullExpired() {
+ return (System.currentTimeMillis() - this.lastPopTimestamp) > PULL_MAX_IDLE_TIME;
+ }
+
+ @Override
+ public String toString() {
+ return "PopProcessQueue[waitAckCounter:" + this.waitAckCounter.get()
+ + ", lastPopTimestamp:" + getLastPopTimestamp()
+ + ", drop:" + dropped + "]";
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopRequest.java
similarity index 64%
copy from client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
copy to client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopRequest.java
index 10aded0..c47f2d0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PopRequest.java
@@ -16,14 +16,17 @@
*/
package org.apache.rocketmq.client.impl.consumer;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageRequestMode;
-public class PullRequest {
+public class PopRequest implements MessageRequest {
+ private String topic;
private String consumerGroup;
private MessageQueue messageQueue;
- private ProcessQueue processQueue;
- private long nextOffset;
+ private PopProcessQueue popProcessQueue;
private boolean lockedFirst = false;
+ private int initMode = ConsumeInitMode.MAX;
public boolean isLockedFirst() {
return lockedFirst;
@@ -49,18 +52,35 @@ public class PullRequest {
this.messageQueue = messageQueue;
}
- public long getNextOffset() {
- return nextOffset;
+ public String getTopic() {
+ return topic;
}
- public void setNextOffset(long nextOffset) {
- this.nextOffset = nextOffset;
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public PopProcessQueue getPopProcessQueue() {
+ return popProcessQueue;
+ }
+
+ public void setPopProcessQueue(PopProcessQueue popProcessQueue) {
+ this.popProcessQueue = popProcessQueue;
+ }
+
+ public int getInitMode() {
+ return initMode;
+ }
+
+ public void setInitMode(int initMode) {
+ this.initMode = initMode;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
+ result = prime * result + ((topic == null) ? 0 : topic.hashCode());
result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
return result;
@@ -74,31 +94,38 @@ public class PullRequest {
return false;
if (getClass() != obj.getClass())
return false;
- PullRequest other = (PullRequest) obj;
+
+ PopRequest other = (PopRequest) obj;
+
+ if (topic == null) {
+ if (other.topic != null)
+ return false;
+ } else if (!topic.equals(other.topic)) {
+ return false;
+ }
+
if (consumerGroup == null) {
if (other.consumerGroup != null)
return false;
} else if (!consumerGroup.equals(other.consumerGroup))
return false;
+
if (messageQueue == null) {
if (other.messageQueue != null)
return false;
- } else if (!messageQueue.equals(other.messageQueue))
+ } else if (!messageQueue.equals(other.messageQueue)) {
return false;
+ }
return true;
}
@Override
public String toString() {
- return "PullRequest [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue
- + ", nextOffset=" + nextOffset + "]";
- }
-
- public ProcessQueue getProcessQueue() {
- return processQueue;
+ return "PopRequest [topic=" + topic + ", consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue + "]";
}
- public void setProcessQueue(ProcessQueue processQueue) {
- this.processQueue = processQueue;
+ @Override
+ public MessageRequestMode getMessageRequestMode() {
+ return MessageRequestMode.POP;
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index cc42a9e..95b609e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -23,6 +23,7 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.consumer.PopCallback;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
@@ -37,16 +38,17 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class PullAPIWrapper {
@@ -269,4 +271,55 @@ public class PullAPIWrapper {
public void setDefaultBrokerId(long defaultBrokerId) {
this.defaultBrokerId = defaultBrokerId;
}
+
+
+ /**
+ *
+ * @param mq
+ * @param invisibleTime
+ * @param maxNums
+ * @param consumerGroup
+ * @param timeout
+ * @param popCallback
+ * @param poll
+ * @param initMode
+ // * @param expressionType
+ // * @param expression
+ * @param order
+ * @throws MQClientException
+ * @throws RemotingException
+ * @throws InterruptedException
+ */
+ public void popAsync(MessageQueue mq, long invisibleTime, int maxNums, String consumerGroup,
+ long timeout, PopCallback popCallback, boolean poll, int initMode, boolean order, String expressionType, String expression)
+ throws MQClientException, RemotingException, InterruptedException {
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+ if (null == findBrokerResult) {
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+ }
+ if (findBrokerResult != null) {
+ PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setTopic(mq.getTopic());
+ requestHeader.setQueueId(mq.getQueueId());
+ requestHeader.setMaxMsgNums(maxNums);
+ requestHeader.setInvisibleTime(invisibleTime);
+ requestHeader.setInitMode(initMode);
+ requestHeader.setExpType(expressionType);
+ requestHeader.setExp(expression);
+ requestHeader.setOrder(order);
+ //give 1000 ms for server response
+ if (poll) {
+ requestHeader.setPollTime(timeout);
+ requestHeader.setBornTime(System.currentTimeMillis());
+ // timeout + 10s, fix the too earlier timeout of client when long polling.
+ timeout += 10 * 1000;
+ }
+ String brokerAddr = findBrokerResult.getBrokerAddr();
+ this.mQClientFactory.getMQClientAPIImpl().popMessageAsync(mq.getBrokerName(), brokerAddr, requestHeader, timeout, popCallback);
+ return;
+ }
+ throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
index bd46a58..9665c6d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -24,12 +24,14 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
public class PullMessageService extends ServiceThread {
private final InternalLogger log = ClientLogger.getLog();
- private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
+ private final LinkedBlockingQueue<MessageRequest> messageRequestQueue = new LinkedBlockingQueue<MessageRequest>();
+
private final MQClientInstance mQClientFactory;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactory() {
@@ -58,7 +60,28 @@ public class PullMessageService extends ServiceThread {
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
- this.pullRequestQueue.put(pullRequest);
+ this.messageRequestQueue.put(pullRequest);
+ } catch (InterruptedException e) {
+ log.error("executePullRequestImmediately pullRequestQueue.put", e);
+ }
+ }
+
+ public void executePopPullRequestLater(final PopRequest pullRequest, final long timeDelay) {
+ if (!isStopped()) {
+ this.scheduledExecutorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ PullMessageService.this.executePopPullRequestImmediately(pullRequest);
+ }
+ }, timeDelay, TimeUnit.MILLISECONDS);
+ } else {
+ log.warn("PullMessageServiceScheduledThread has shutdown");
+ }
+ }
+
+ public void executePopPullRequestImmediately(final PopRequest pullRequest) {
+ try {
+ this.messageRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
@@ -86,14 +109,28 @@ public class PullMessageService extends ServiceThread {
}
}
+ private void popMessage(final PopRequest popRequest) {
+ final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(popRequest.getConsumerGroup());
+ if (consumer != null) {
+ DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
+ impl.popMessage(popRequest);
+ } else {
+ log.warn("No matched consumer for the PopRequest {}, drop it", popRequest);
+ }
+ }
+
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
- PullRequest pullRequest = this.pullRequestQueue.take();
- this.pullMessage(pullRequest);
+ MessageRequest messageRequest = this.messageRequestQueue.take();
+ if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) {
+ this.popMessage((PopRequest)messageRequest);
+ } else {
+ this.pullMessage((PullRequest)messageRequest);
+ }
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
index 10aded0..71d1fdb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
@@ -17,8 +17,9 @@
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageRequestMode;
-public class PullRequest {
+public class PullRequest implements MessageRequest {
private String consumerGroup;
private MessageQueue messageQueue;
private ProcessQueue processQueue;
@@ -101,4 +102,9 @@ public class PullRequest {
public void setProcessQueue(ProcessQueue processQueue) {
this.processQueue = processQueue;
}
+
+ @Override
+ public MessageRequestMode getMessageRequestMode() {
+ return MessageRequestMode.PULL;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index b8972a9..7e78c9b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -31,18 +31,26 @@ import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
+
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
+ protected final ConcurrentMap<MessageQueue, PopProcessQueue> popProcessQueueTable = new ConcurrentHashMap<MessageQueue, PopProcessQueue>(64);
+
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
new ConcurrentHashMap<String, Set<MessageQueue>>();
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
@@ -51,6 +59,11 @@ public abstract class RebalanceImpl {
protected MessageModel messageModel;
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
protected MQClientInstance mQClientFactory;
+ private static final int TIMEOUT_CHECK_TIMES = 3;
+ private static final int QUERY_ASSIGNMENT_TIMEOUT = 3000;
+
+ private Map<String, String> topicBrokerRebalance = new ConcurrentHashMap<String, String>();
+ private Map<String, String> topicClientRebalance = new ConcurrentHashMap<String, String>();
public RebalanceImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
@@ -88,8 +101,9 @@ public abstract class RebalanceImpl {
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
- if (mqs.isEmpty())
+ if (mqs.isEmpty()) {
continue;
+ }
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
@@ -117,7 +131,15 @@ public abstract class RebalanceImpl {
private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();
- for (MessageQueue mq : this.processQueueTable.keySet()) {
+
+ for (Map.Entry<MessageQueue, ProcessQueue> entry : this.processQueueTable.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ ProcessQueue pq = entry.getValue();
+
+ if (pq.isDropped()) {
+ continue;
+ }
+
Set<MessageQueue> mqs = result.get(mq.getBrokerName());
if (null == mqs) {
mqs = new HashSet<MessageQueue>();
@@ -150,10 +172,7 @@ public abstract class RebalanceImpl {
}
boolean lockOK = lockedMq.contains(mq);
- log.info("the message queue lock {}, {} {}",
- lockOK ? "OK" : "Failed",
- this.consumerGroup,
- mq);
+ log.info("message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
@@ -172,8 +191,9 @@ public abstract class RebalanceImpl {
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
- if (mqs.isEmpty())
+ if (mqs.isEmpty()) {
continue;
+ }
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
@@ -213,29 +233,86 @@ public abstract class RebalanceImpl {
}
}
- public void doRebalance(final boolean isOrder) {
+ public boolean clientRebalance(String topic) {
+ return true;
+ }
+
+ public boolean doRebalance(final boolean isOrder) {
+ boolean balanced = true;
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
- this.rebalanceByTopic(topic, isOrder);
+ if (!clientRebalance(topic) && tryQueryAssignment(topic)) {
+ balanced = this.getRebalanceResultFromBroker(topic, isOrder);
+ } else {
+ balanced = this.rebalanceByTopic(topic, isOrder);
+ }
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("rebalanceByTopic Exception", e);
+ log.warn("rebalance Exception", e);
+ balanced = false;
}
}
}
}
this.truncateMessageQueueNotMyTopic();
+
+ return balanced;
+ }
+
+ private boolean tryQueryAssignment(String topic) {
+ if (topicClientRebalance.containsKey(topic)) {
+ return false;
+ }
+
+ if (topicBrokerRebalance.containsKey(topic)) {
+ return true;
+ }
+
+ String strategyName = allocateMessageQueueStrategy != null ? allocateMessageQueueStrategy.getName() : null;
+
+ boolean success = false;
+ int i = 0;
+ int timeOut = 0;
+ while (i++ < TIMEOUT_CHECK_TIMES) {
+ try {
+ Set<MessageQueueAssignment> resultSet = mQClientFactory.queryAssignment(topic, consumerGroup,
+ strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT / TIMEOUT_CHECK_TIMES * i);
+ success = true;
+ break;
+ } catch (Throwable t) {
+ if (t instanceof RemotingTimeoutException) {
+ timeOut++;
+ } else {
+ log.error("tryQueryAssignment error.", t);
+ break;
+ }
+ }
+ }
+
+ if (success) {
+ topicBrokerRebalance.put(topic, topic);
+ return true;
+ } else {
+ if (timeOut >= TIMEOUT_CHECK_TIMES) {
+ // if never success before and timeout exceed TIMEOUT_CHECK_TIMES, force client rebalance
+ topicClientRebalance.put(topic, topic);
+ return false;
+ } else {
+ return true;
+ }
+ }
}
public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
return subscriptionInner;
}
- private void rebalanceByTopic(final String topic, final boolean isOrder) {
+ private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
+ boolean balanced = true;
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
@@ -243,13 +320,12 @@ public abstract class RebalanceImpl {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
- log.info("messageQueueChanged {} {} {} {}",
- consumerGroup,
- topic,
- mqSet,
- mqSet);
+ log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet);
}
+
+ balanced = mqSet.equals(getWorkingMessageQueue(topic));
} else {
+ this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
@@ -259,6 +335,7 @@ public abstract class RebalanceImpl {
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
@@ -284,9 +361,8 @@ public abstract class RebalanceImpl {
mqAll,
cidAll);
} catch (Throwable e) {
- log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
- e);
- return;
+ log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
+ return false;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
@@ -297,17 +373,89 @@ public abstract class RebalanceImpl {
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
- "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
+ "client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
+
+ balanced = allocateResultSet.equals(getWorkingMessageQueue(topic));
}
break;
}
default:
break;
}
+
+ return balanced;
+ }
+
+ private boolean getRebalanceResultFromBroker(final String topic, final boolean isOrder) {
+ String strategyName;
+ switch (messageModel) {
+ case BROADCASTING:
+ strategyName = null;
+ break;
+ case CLUSTERING:
+ strategyName = this.allocateMessageQueueStrategy.getName();
+ break;
+ default:
+ return true;
+ }
+ Set<MessageQueueAssignment> messageQueueAssignments;
+ try {
+ messageQueueAssignments = this.mQClientFactory.queryAssignment(topic, consumerGroup,
+ strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT);
+ } catch (Exception e) {
+ log.error("allocate message queue exception. strategy name: {}, ex: {}", strategyName, e);
+ return false;
+ }
+
+ // null means invalid result, we should skip the update logic
+ if (messageQueueAssignments == null) {
+ return false;
+ }
+ Set<MessageQueue> mqSet = new HashSet<MessageQueue>();
+ for (MessageQueueAssignment messageQueueAssignment : messageQueueAssignments) {
+ if (messageQueueAssignment.getMessageQueue() != null) {
+ mqSet.add(messageQueueAssignment.getMessageQueue());
+ }
+ }
+ Set<MessageQueue> mqAll = null;
+ if (messageModel == MessageModel.BROADCASTING) {
+ mqAll = mqSet;
+ }
+ boolean changed = this.updateMessageQueueAssignment(topic, messageQueueAssignments, isOrder);
+ if (changed) {
+ log.info("broker rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, assignmentSet={}",
+ strategyName, consumerGroup, topic, this.mQClientFactory.getClientId(), messageQueueAssignments);
+ this.messageQueueChanged(topic, mqAll, mqSet);
+ }
+
+ return mqSet.equals(getWorkingMessageQueue(topic));
+ }
+
+ private Set<MessageQueue> getWorkingMessageQueue(String topic) {
+ Set<MessageQueue> queueSet = new HashSet<MessageQueue>();
+ for (Entry<MessageQueue, ProcessQueue> entry : this.processQueueTable.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ ProcessQueue pq = entry.getValue();
+
+ if (mq.getTopic().equals(topic) && !pq.isDropped()) {
+ queueSet.add(mq);
+ }
+ }
+
+ for (Entry<MessageQueue, PopProcessQueue> entry : this.popProcessQueueTable.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ PopProcessQueue pq = entry.getValue();
+
+ if (mq.getTopic().equals(topic) && !pq.isDropped()) {
+ queueSet.add(mq);
+ }
+ }
+
+ return queueSet;
}
private void truncateMessageQueueNotMyTopic() {
@@ -323,12 +471,40 @@ public abstract class RebalanceImpl {
}
}
}
+
+ for (MessageQueue mq : this.popProcessQueueTable.keySet()) {
+ if (!subTable.containsKey(mq.getTopic())) {
+
+ PopProcessQueue pq = this.popProcessQueueTable.remove(mq);
+ if (pq != null) {
+ pq.setDropped(true);
+ log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary pop mq, {}", consumerGroup, mq);
+ }
+ }
+ }
+
+ Iterator<Map.Entry<String, String>> clientIter = topicClientRebalance.entrySet().iterator();
+ while (clientIter.hasNext()) {
+ if (!subTable.containsKey(clientIter.next().getKey())) {
+ clientIter.remove();
+ }
+ }
+
+ Iterator<Map.Entry<String, String>> brokerIter = topicBrokerRebalance.entrySet().iterator();
+ while (brokerIter.hasNext()) {
+ if (!subTable.containsKey(brokerIter.next().getKey())) {
+ brokerIter.remove();
+ }
+ }
}
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
+ Map<MessageQueue, MessageQueue> upgradeMqTable = new HashMap<MessageQueue, MessageQueue>();
+ // drop process queues no longer belong me
+ HashMap<MessageQueue, ProcessQueue> removeQueueMap = new HashMap<MessageQueue, ProcessQueue>(this.processQueueTable.size());
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
@@ -338,41 +514,42 @@ public abstract class RebalanceImpl {
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
- if (this.removeUnnecessaryMessageQueue(mq, pq)) {
- it.remove();
- changed = true;
- log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
- }
- } else if (pq.isPullExpired()) {
- switch (this.consumeType()) {
- case CONSUME_ACTIVELY:
- break;
- case CONSUME_PASSIVELY:
- pq.setDropped(true);
- if (this.removeUnnecessaryMessageQueue(mq, pq)) {
- it.remove();
- changed = true;
- log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
- consumerGroup, mq);
- }
- break;
- default:
- break;
- }
+ removeQueueMap.put(mq, pq);
+ } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
+ pq.setDropped(true);
+ removeQueueMap.put(mq, pq);
+ log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it",
+ consumerGroup, mq);
}
}
}
+ // remove message queues no longer belong me
+ for (Entry<MessageQueue, ProcessQueue> entry : removeQueueMap.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ ProcessQueue pq = entry.getValue();
+
+ if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+ this.processQueueTable.remove(mq);
+ changed = true;
+ log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
+ }
+ }
+
+ // add new message queue
+ boolean allMQLocked = true;
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
+ allMQLocked = false;
continue;
}
this.removeDirtyOffset(mq);
- ProcessQueue pq = new ProcessQueue();
+ ProcessQueue pq = createProcessQueue(topic);
+ pq.setLocked(true);
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
@@ -392,9 +569,194 @@ public abstract class RebalanceImpl {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
+
+ }
+
+ if (!allMQLocked) {
+ mQClientFactory.rebalanceLater(500);
+ }
+
+ this.dispatchPullRequest(pullRequestList, 500);
+
+ return changed;
+ }
+
+ private boolean updateMessageQueueAssignment(final String topic, final Set<MessageQueueAssignment> assignments,
+ final boolean isOrder) {
+ boolean changed = false;
+
+ Map<MessageQueue, MessageQueueAssignment> mq2PushAssignment = new HashMap<MessageQueue, MessageQueueAssignment>();
+ Map<MessageQueue, MessageQueueAssignment> mq2PopAssignment = new HashMap<MessageQueue, MessageQueueAssignment>();
+ for (MessageQueueAssignment assignment : assignments) {
+ MessageQueue messageQueue = assignment.getMessageQueue();
+ if (messageQueue == null) {
+ continue;
+ }
+ if (MessageRequestMode.POP == assignment.getMode()) {
+ mq2PopAssignment.put(messageQueue, assignment);
+ } else {
+ mq2PushAssignment.put(messageQueue, assignment);
+ }
+ }
+
+ if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ if (mq2PopAssignment.isEmpty() && !mq2PushAssignment.isEmpty()) {
+ //pop switch to push
+ //subscribe pop retry topic
+ try {
+ final String retryTopic = KeyBuilder.buildPopRetryTopic(topic, getConsumerGroup());
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
+ getSubscriptionInner().put(retryTopic, subscriptionData);
+ } catch (Exception ignored) {
+ }
+
+ } else if (!mq2PopAssignment.isEmpty() && mq2PushAssignment.isEmpty()) {
+ //push switch to pop
+ //unsubscribe pop retry topic
+ try {
+ final String retryTopic = KeyBuilder.buildPopRetryTopic(topic, getConsumerGroup());
+ getSubscriptionInner().remove(retryTopic);
+ } catch (Exception ignored) {
+ }
+
+ }
+ }
+
+ {
+ // drop process queues no longer belong me
+ HashMap<MessageQueue, ProcessQueue> removeQueueMap = new HashMap<MessageQueue, ProcessQueue>(this.processQueueTable.size());
+ Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueue> next = it.next();
+ MessageQueue mq = next.getKey();
+ ProcessQueue pq = next.getValue();
+
+ if (mq.getTopic().equals(topic)) {
+ if (!mq2PushAssignment.containsKey(mq)) {
+ pq.setDropped(true);
+ removeQueueMap.put(mq, pq);
+ } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
+ pq.setDropped(true);
+ removeQueueMap.put(mq, pq);
+ log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it",
+ consumerGroup, mq);
+ }
+ }
+ }
+ // remove message queues no longer belong me
+ for (Entry<MessageQueue, ProcessQueue> entry : removeQueueMap.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ ProcessQueue pq = entry.getValue();
+
+ if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+ this.processQueueTable.remove(mq);
+ changed = true;
+ log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
+ }
+ }
}
- this.dispatchPullRequest(pullRequestList);
+ {
+ HashMap<MessageQueue, PopProcessQueue> removeQueueMap = new HashMap<MessageQueue, PopProcessQueue>(this.popProcessQueueTable.size());
+ Iterator<Entry<MessageQueue, PopProcessQueue>> it = this.popProcessQueueTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, PopProcessQueue> next = it.next();
+ MessageQueue mq = next.getKey();
+ PopProcessQueue pq = next.getValue();
+
+ if (mq.getTopic().equals(topic)) {
+ if (!mq2PopAssignment.containsKey(mq)) {
+ //the queue is no longer your assignment
+ pq.setDropped(true);
+ removeQueueMap.put(mq, pq);
+ } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
+ pq.setDropped(true);
+ removeQueueMap.put(mq, pq);
+ log.error("[BUG]doRebalance, {}, try remove unnecessary pop mq, {}, because pop is pause, so try to fixed it",
+ consumerGroup, mq);
+ }
+ }
+ }
+ // remove message queues no longer belong me
+ for (Entry<MessageQueue, PopProcessQueue> entry : removeQueueMap.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ PopProcessQueue pq = entry.getValue();
+
+ if (this.removeUnnecessaryPopMessageQueue(mq, pq)) {
+ this.popProcessQueueTable.remove(mq);
+ changed = true;
+ log.info("doRebalance, {}, remove unnecessary pop mq, {}", consumerGroup, mq);
+ }
+ }
+ }
+
+ {
+ // add new message queue
+ boolean allMQLocked = true;
+ List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
+ for (MessageQueue mq : mq2PushAssignment.keySet()) {
+ if (!this.processQueueTable.containsKey(mq)) {
+ if (isOrder && !this.lock(mq)) {
+ log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
+ allMQLocked = false;
+ continue;
+ }
+
+ this.removeDirtyOffset(mq);
+ ProcessQueue pq = createProcessQueue();
+ pq.setLocked(true);
+ long nextOffset = this.computePullFromWhere(mq);
+ if (nextOffset >= 0) {
+ ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
+ if (pre != null) {
+ log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
+ } else {
+ log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
+ PullRequest pullRequest = new PullRequest();
+ pullRequest.setConsumerGroup(consumerGroup);
+ pullRequest.setNextOffset(nextOffset);
+ pullRequest.setMessageQueue(mq);
+ pullRequest.setProcessQueue(pq);
+ pullRequestList.add(pullRequest);
+ changed = true;
+ }
+ } else {
+ log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
+ }
+ }
+ }
+
+ if (!allMQLocked) {
+ mQClientFactory.rebalanceLater(500);
+ }
+ this.dispatchPullRequest(pullRequestList, 500);
+ }
+
+ {
+ // add new message queue
+ List<PopRequest> popRequestList = new ArrayList<PopRequest>();
+ for (MessageQueue mq : mq2PopAssignment.keySet()) {
+ if (!this.popProcessQueueTable.containsKey(mq)) {
+ PopProcessQueue pq = createPopProcessQueue();
+ PopProcessQueue pre = this.popProcessQueueTable.putIfAbsent(mq, pq);
+ if (pre != null) {
+ log.info("doRebalance, {}, mq pop already exists, {}", consumerGroup, mq);
+ } else {
+ log.info("doRebalance, {}, add a new pop mq, {}", consumerGroup, mq);
+ PopRequest popRequest = new PopRequest();
+ popRequest.setTopic(topic);
+ popRequest.setConsumerGroup(consumerGroup);
+ popRequest.setMessageQueue(mq);
+ popRequest.setPopProcessQueue(pq);
+ popRequest.setInitMode(getConsumeInitMode());
+ popRequestList.add(popRequest);
+ changed = true;
+ }
+ }
+ }
+
+ this.dispatchPopPullRequest(popRequestList, 500);
+ }
return changed;
}
@@ -404,13 +766,27 @@ public abstract class RebalanceImpl {
public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
+ public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq, final PopProcessQueue pq) {
+ return true;
+ }
+
public abstract ConsumeType consumeType();
public abstract void removeDirtyOffset(final MessageQueue mq);
public abstract long computePullFromWhere(final MessageQueue mq);
- public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
+ public abstract int getConsumeInitMode();
+
+ public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay);
+
+ public abstract void dispatchPopPullRequest(final List<PopRequest> pullRequestList, final long delay);
+
+ public abstract ProcessQueue createProcessQueue();
+
+ public abstract PopProcessQueue createPopProcessQueue();
+
+ public abstract ProcessQueue createProcessQueue(String topicName);
public void removeProcessQueue(final MessageQueue mq) {
ProcessQueue prev = this.processQueueTable.remove(mq);
@@ -426,6 +802,10 @@ public abstract class RebalanceImpl {
return processQueueTable;
}
+ public ConcurrentMap<MessageQueue, PopProcessQueue> getPopProcessQueueTable() {
+ return popProcessQueueTable;
+ }
+
public ConcurrentMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
return topicSubscribeInfoTable;
}
@@ -470,5 +850,13 @@ public abstract class RebalanceImpl {
}
this.processQueueTable.clear();
+
+ Iterator<Entry<MessageQueue, PopProcessQueue>> popIt = this.popProcessQueueTable.entrySet().iterator();
+ while (popIt.hasNext()) {
+ Entry<MessageQueue, PopProcessQueue> next = popIt.next();
+ next.getValue().setDropped(true);
+ }
+ this.popProcessQueueTable.clear();
}
+
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 9d1ea74..c8f5cf1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -139,7 +139,30 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
}
@Override
- public void dispatchPullRequest(List<PullRequest> pullRequestList) {
+ public int getConsumeInitMode() {
+ throw new UnsupportedOperationException("no initMode for Pull");
}
+ @Override
+ public void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay) {
+ }
+
+ @Override
+ public void dispatchPopPullRequest(List<PopRequest> pullRequestList, long delay) {
+
+ }
+
+ @Override
+ public ProcessQueue createProcessQueue() {
+ return new ProcessQueue();
+ }
+
+ @Override
+ public PopProcessQueue createPopProcessQueue() {
+ return null;
+ }
+
+ public ProcessQueue createProcessQueue(String topicName) {
+ return createProcessQueue();
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
index 9dd408c..a68fb04 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
@@ -74,6 +74,30 @@ public class RebalancePullImpl extends RebalanceImpl {
}
@Override
- public void dispatchPullRequest(List<PullRequest> pullRequestList) {
+ public int getConsumeInitMode() {
+ throw new UnsupportedOperationException("no initMode for Pull");
}
+
+ @Override
+ public void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay) {
+ }
+
+ @Override
+ public void dispatchPopPullRequest(final List<PopRequest> pullRequestList, final long delay) {
+ }
+
+ @Override
+ public ProcessQueue createProcessQueue() {
+ return new ProcessQueue();
+ }
+
+ @Override
+ public PopProcessQueue createPopProcessQueue() {
+ return null;
+ }
+
+ public ProcessQueue createProcessQueue(String topicName) {
+ return createProcessQueue();
+ }
+
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 9582391..88c7c1e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -36,6 +37,7 @@ public class RebalancePushImpl extends RebalanceImpl {
private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+
public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
this(null, null, null, null, defaultMQPushConsumerImpl);
}
@@ -110,6 +112,16 @@ public class RebalancePushImpl extends RebalanceImpl {
return true;
}
+ @Override
+ public boolean clientRebalance(String topic) {
+ // POPTODO order pop consume not implement yet
+ return defaultMQPushConsumerImpl.getDefaultMQPushConsumer().isClientRebalance() || defaultMQPushConsumerImpl.isConsumeOrderly();
+ }
+
+ public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq, final PopProcessQueue pq) {
+ return true;
+ }
+
private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
if (pq.hasTempMessage()) {
@@ -212,10 +224,48 @@ public class RebalancePushImpl extends RebalanceImpl {
}
@Override
- public void dispatchPullRequest(List<PullRequest> pullRequestList) {
+ public int getConsumeInitMode() {
+ final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
+ if (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET == consumeFromWhere) {
+ return ConsumeInitMode.MIN;
+ } else {
+ return ConsumeInitMode.MAX;
+ }
+ }
+
+ @Override
+ public void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay) {
for (PullRequest pullRequest : pullRequestList) {
- this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
- log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
+ if (delay <= 0) {
+ this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
+ } else {
+ this.defaultMQPushConsumerImpl.executePullRequestLater(pullRequest, delay);
+ }
}
}
+
+ @Override
+ public void dispatchPopPullRequest(final List<PopRequest> pullRequestList, final long delay) {
+ for (PopRequest pullRequest : pullRequestList) {
+ if (delay <= 0) {
+ this.defaultMQPushConsumerImpl.executePopPullRequestImmediately(pullRequest);
+ } else {
+ this.defaultMQPushConsumerImpl.executePopPullRequestLater(pullRequest, delay);
+ }
+ }
+ }
+
+ @Override
+ public ProcessQueue createProcessQueue() {
+ return new ProcessQueue();
+ }
+
+ @Override public ProcessQueue createProcessQueue(String topicName) {
+ return createProcessQueue();
+ }
+
+ @Override
+ public PopProcessQueue createPopProcessQueue() {
+ return new PopProcessQueue();
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 81e6d84..619cfc2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -64,26 +63,29 @@ import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final InternalLogger log = ClientLogger.getLog();
@@ -964,6 +966,19 @@ public class MQClientInstance {
this.adminExtTable.remove(group);
}
+ public void rebalanceLater(long delayMillis) {
+ if (delayMillis <= 0) {
+ this.rebalanceService.wakeup();
+ } else {
+ this.scheduledExecutorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ MQClientInstance.this.rebalanceService.wakeup();
+ }
+ }, delayMillis, TimeUnit.MILLISECONDS);
+ }
+ }
+
public void rebalanceImmediately() {
this.rebalanceService.wakeup();
}
@@ -1091,6 +1106,22 @@ public class MQClientInstance {
return null;
}
+ public Set<MessageQueueAssignment> queryAssignment(final String topic, final String consumerGroup, final String strategyName, final MessageModel messageModel, int timeout)
+ throws RemotingException, InterruptedException, MQBrokerException {
+ String brokerAddr = this.findBrokerAddrByTopic(topic);
+ if (null == brokerAddr) {
+ this.updateTopicRouteInfoFromNameServer(topic);
+ brokerAddr = this.findBrokerAddrByTopic(topic);
+ }
+
+ if (null != brokerAddr) {
+ return this.mQClientAPIImpl.queryAssignment(brokerAddr, topic, consumerGroup, clientId, strategyName,
+ messageModel, timeout);
+ }
+
+ return null;
+ }
+
public String findBrokerAddrByTopic(final String topic) {
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null) {
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 5b7e635..eee8726 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -20,8 +20,10 @@ package org.apache.rocketmq.client.consumer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
-import java.util.*;
-
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
@@ -52,11 +54,8 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
@@ -70,9 +69,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DefaultLitePullConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.class)
public class DefaultLitePullConsumerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@@ -94,13 +91,16 @@ public class DefaultLitePullConsumerTest {
@Before
public void init() throws Exception {
- PowerMockito.suppress(PowerMockito.method(DefaultLitePullConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
field.setAccessible(true);
RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
field = RebalanceService.class.getDeclaredField("waitInterval");
field.setAccessible(true);
field.set(rebalanceService, 100);
+
+ field = DefaultLitePullConsumerImpl.class.getDeclaredField("doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged");
+ field.setAccessible(true);
+ field.set(null, true);
}
@Test
@@ -183,6 +183,7 @@ public class DefaultLitePullConsumerTest {
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
+ litePullConsumer.pause(Collections.singletonList(messageQueue));
long offset = litePullConsumer.committed(messageQueue);
litePullConsumer.seek(messageQueue, offset);
Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
@@ -199,6 +200,7 @@ public class DefaultLitePullConsumerTest {
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
+ litePullConsumer.pause(Collections.singletonList(messageQueue));
litePullConsumer.seekToBegin(messageQueue);
Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
@@ -214,6 +216,7 @@ public class DefaultLitePullConsumerTest {
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(500L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
+ litePullConsumer.pause(Collections.singletonList(messageQueue));
litePullConsumer.seekToEnd(messageQueue);
Field field = DefaultLitePullConsumerImpl.class.getDeclaredField("assignedMessageQueue");
field.setAccessible(true);
@@ -229,6 +232,7 @@ public class DefaultLitePullConsumerTest {
when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
MessageQueue messageQueue = createMessageQueue();
litePullConsumer.assign(Collections.singletonList(messageQueue));
+ litePullConsumer.pause(Collections.singletonList(messageQueue));
try {
litePullConsumer.seek(messageQueue, -1);
failBecauseExceptionWasNotThrown(MQClientException.class);
@@ -517,9 +521,6 @@ public class DefaultLitePullConsumerTest {
public void testConsumerAfterShutdown() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createSubscribeLitePullConsumer();
- DefaultLitePullConsumer mockConsumer = spy(defaultLitePullConsumer);
- when(mockConsumer.poll(anyLong())).thenReturn(new ArrayList<>());
-
new AsyncConsumer().executeAsync(defaultLitePullConsumer);
Thread.sleep(100);
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 924ee12..93a4ae2 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -24,8 +24,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
@@ -60,11 +60,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
@@ -77,20 +74,20 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DefaultMQPushConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerTest {
private String consumerGroup;
private String topic = "FooBar";
private String brokerName = "BrokerA";
private MQClientInstance mQClientFactory;
+ private final byte[] msgBody = Long.toString(System.currentTimeMillis()).getBytes();
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper;
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer;
+ private AtomicLong queueOffset = new AtomicLong(1024);;
@Before
public void init() throws Exception {
@@ -108,12 +105,15 @@ public class DefaultMQPushConsumerTest {
});
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
- PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
+ field = DefaultMQPushConsumerImpl.class.getDeclaredField("doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged");
+ field.setAccessible(true);
+ field.set(null, true);
+
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
@@ -143,8 +143,9 @@ public class DefaultMQPushConsumerTest {
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
- messageClientExt.setMsgId("123");
- messageClientExt.setBody(new byte[] {'a'});
+ messageClientExt.setQueueOffset(queueOffset.getAndIncrement());
+ messageClientExt.setMsgId("1024");
+ messageClientExt.setBody(msgBody);
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
@@ -190,10 +191,10 @@ public class DefaultMQPushConsumerTest {
pullMessageService.executePullRequestImmediately(createPullRequest());
countDownLatch.await();
assertThat(messageExts[0].getTopic()).isEqualTo(topic);
- assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+ assertThat(messageExts[0].getBody()).isEqualTo(msgBody);
}
- @Test
+ @Test(timeout = 20000)
public void testPullMessage_SuccessWithOrderlyService() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
@@ -213,9 +214,9 @@ public class DefaultMQPushConsumerTest {
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestLater(createPullRequest(), 100);
- countDownLatch.await(10, TimeUnit.SECONDS);
+ countDownLatch.await();
assertThat(messageExts[0].getTopic()).isEqualTo(topic);
- assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
+ assertThat(messageExts[0].getBody()).isEqualTo(msgBody);
}
@Test
@@ -259,7 +260,7 @@ public class DefaultMQPushConsumerTest {
}
}
- @Test
+ @Test(timeout = 20000)
public void testGracefulShutdown() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
pushConsumer.setAwaitTerminationMillisWhenShutdown(2000);
@@ -268,6 +269,7 @@ public class DefaultMQPushConsumerTest {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
+ assertThat(msgs.get(0).getBody()).isEqualTo(msgBody);
countDownLatch.countDown();
try {
Thread.sleep(1000);
@@ -302,7 +304,7 @@ public class DefaultMQPushConsumerTest {
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
- pullRequest.setNextOffset(1024);
+ pullRequest.setNextOffset(queueOffset.get());
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 3f00d9e..c91d55a 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -17,7 +17,18 @@
package org.apache.rocketmq.client.impl;
import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.AckStatus;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.SendMessageContext;
@@ -26,12 +37,42 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
+import org.apache.rocketmq.common.protocol.body.QueryAssignmentResponseBody;
+import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PopMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -39,6 +80,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -67,9 +109,11 @@ public class MQClientAPIImplTest {
private String brokerAddr = "127.0.0.1";
private String brokerName = "DefaultBroker";
+ private String clusterName = "DefaultCluster";
private static String group = "FooBarGroup";
private static String topic = "FooBar";
private Message msg = new Message("FooBar", new byte[] {});
+ private static String clientId = "127.0.0.2@UnitTest";
@Before
public void init() throws Exception {
@@ -113,7 +157,7 @@ public class MQClientAPIImplTest {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
RemotingCommand request = mock.getArgument(1);
- return createSuccessResponse(request);
+ return createSendMessageSuccessResponse(request);
}
}).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
@@ -166,7 +210,7 @@ public class MQClientAPIImplTest {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
- responseFuture.setResponseCommand(createSuccessResponse(request));
+ responseFuture.setResponseCommand(createSendMessageSuccessResponse(request));
callback.operationComplete(responseFuture);
return null;
}
@@ -332,7 +376,7 @@ public class MQClientAPIImplTest {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
- responseFuture.setResponseCommand(createSuccessResponse(request));
+ responseFuture.setResponseCommand(createSendMessageSuccessResponse(request));
callback.operationComplete(responseFuture);
return null;
}
@@ -356,6 +400,444 @@ public class MQClientAPIImplTest {
}, null, null, 0, sendMessageContext, defaultMQProducerImpl);
}
+ @Test
+ public void testQueryAssignment_Success() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ QueryAssignmentResponseBody b = new QueryAssignmentResponseBody();
+ b.setMessageQueueAssignments(Collections.singleton(new MessageQueueAssignment()));
+ response.setBody(b.encode());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+ Set<MessageQueueAssignment> assignments = mqClientAPI.queryAssignment(brokerAddr, topic, group, clientId, null, MessageModel.CLUSTERING, 10 * 1000);
+ assertThat(assignments).size().isEqualTo(1);
+ }
+
+ @Test
+ public void testPopMessageAsync_Success() throws Exception {
+ final long popTime = System.currentTimeMillis();
+ final int invisibleTime = 10 * 1000;
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock mock) throws Throwable {
+ InvokeCallback callback = mock.getArgument(3);
+ RemotingCommand request = mock.getArgument(1);
+ ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
+ RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+
+ PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
+ responseHeader.setInvisibleTime(invisibleTime);
+ responseHeader.setPopTime(popTime);
+ responseHeader.setReviveQid(0);
+ responseHeader.setRestNum(1);
+ StringBuilder startOffsetInfo = new StringBuilder(64);
+ ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, false, 0, 0L);
+ responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
+ StringBuilder msgOffsetInfo = new StringBuilder(64);
+ ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, false, 0, Collections.singletonList(0L));
+ responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
+ response.setRemark("FOUND");
+ response.makeCustomHeaderToNet();
+
+ MessageExt message = new MessageExt();
+ message.setQueueId(0);
+ message.setFlag(12);
+ message.setQueueOffset(0L);
+ message.setCommitLogOffset(100L);
+ message.setSysFlag(0);
+ message.setBornTimestamp(System.currentTimeMillis());
+ message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+ message.setStoreTimestamp(System.currentTimeMillis());
+ message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+ message.setBody("body".getBytes());
+ message.setTopic(topic);
+ message.putUserProperty("key", "value");
+ response.setBody(MessageDecoder.encode(message, false));
+ responseFuture.setResponseCommand(response);
+ callback.operationComplete(responseFuture);
+ return null;
+ }
+ }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+ final CountDownLatch done = new CountDownLatch(1);
+ mqClientAPI.popMessageAsync(brokerName, brokerAddr, new PopMessageRequestHeader(), 10 * 1000, new PopCallback() {
+ @Override public void onSuccess(PopResult popResult) {
+ assertThat(popResult.getPopStatus()).isEqualTo(PopStatus.FOUND);
+ assertThat(popResult.getRestNum()).isEqualTo(1);
+ assertThat(popResult.getInvisibleTime()).isEqualTo(invisibleTime);
+ assertThat(popResult.getPopTime()).isEqualTo(popTime);
+ assertThat(popResult.getMsgFoundList()).size().isEqualTo(1);
+ done.countDown();
+ }
+
+ @Override public void onException(Throwable e) {
+ Assertions.fail("want no exception but got one", e);
+ done.countDown();
+ }
+ });
+ done.await();
+ }
+
+ @Test
+ public void testAckMessageAsync_Success() throws Exception {
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock mock) throws Throwable {
+ InvokeCallback callback = mock.getArgument(3);
+ RemotingCommand request = mock.getArgument(1);
+ ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
+ RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
+ response.setOpaque(request.getOpaque());
+ response.setCode(ResponseCode.SUCCESS);
+ responseFuture.setResponseCommand(response);
+ callback.operationComplete(responseFuture);
+ return null;
+ }
+ }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+ final CountDownLatch done = new CountDownLatch(1);
+ mqClientAPI.ackMessageAsync(brokerAddr, 10 * 1000, new AckCallback() {
+ @Override public void onSuccess(AckResult ackResult) {
+ assertThat(ackResult.getStatus()).isEqualTo(AckStatus.OK);
+ done.countDown();
+ }
+
+ @Override public void onException(Throwable e) {
+ Assertions.fail("want no exception but got one", e);
+ done.countDown();
+ }
+ }, new AckMessageRequestHeader());
+ done.await();
+ }
+
+ @Test
+ public void testChangeInvisibleTimeAsync_Success() throws Exception {
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock mock) throws Throwable {
+ InvokeCallback callback = mock.getArgument(3);
+ RemotingCommand request = mock.getArgument(1);
+ ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
+ RemotingCommand response = RemotingCommand.createResponseCommand(ChangeInvisibleTimeResponseHeader.class);
+ response.setOpaque(request.getOpaque());
+ response.setCode(ResponseCode.SUCCESS);
+ ChangeInvisibleTimeResponseHeader responseHeader = (ChangeInvisibleTimeResponseHeader) response.readCustomHeader();
+ responseHeader.setPopTime(System.currentTimeMillis());
+ responseHeader.setInvisibleTime(10 * 1000L);
+ responseFuture.setResponseCommand(response);
+ callback.operationComplete(responseFuture);
+ return null;
+ }
+ }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+ final CountDownLatch done = new CountDownLatch(1);
+ ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(0);
+ requestHeader.setOffset(0L);
+ requestHeader.setInvisibleTime(10 * 1000L);
+ mqClientAPI.changeInvisibleTimeAsync(brokerName, brokerAddr, requestHeader, 10 * 1000, new AckCallback() {
+ @Override public void onSuccess(AckResult ackResult) {
+ assertThat(ackResult.getStatus()).isEqualTo(AckStatus.OK);
+ done.countDown();
+ }
+
+ @Override public void onException(Throwable e) {
+ Assertions.fail("want no exception but got one", e);
+ done.countDown();
+ }
+ });
+ done.await();
+ }
+
+ @Test
+ public void testSetMessageRequestMode_Success() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ mqClientAPI.setMessageRequestMode(brokerAddr, topic, group, MessageRequestMode.POP, 8, 10 * 1000L);
+ }
+
+ @Test
+ public void testCreateSubscriptionGroup_Success() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ mqClientAPI.createSubscriptionGroup(brokerAddr, new SubscriptionGroupConfig(), 10000);
+ }
+
+ @Test
+ public void testCreateTopic_Success() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ mqClientAPI.createTopic(brokerAddr, topic, new TopicConfig(), 10000);
+ }
+
+ @Test
+ public void testGetBrokerClusterAclInfo() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerAclConfigResponseHeader.class);
+ GetBrokerAclConfigResponseHeader responseHeader = (GetBrokerAclConfigResponseHeader) response.readCustomHeader();
+ responseHeader.setVersion(new DataVersion().toJson());
+ responseHeader.setBrokerAddr(brokerAddr);
+ responseHeader.setBrokerName(brokerName);
+ responseHeader.setClusterName(clusterName);
+ response.makeCustomHeaderToNet();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ ClusterAclVersionInfo info = mqClientAPI.getBrokerClusterAclInfo(brokerAddr, 10000);
+ assertThat(info.getAclConfigDataVersion().getTimestamp()).isGreaterThan(0);
+ }
+
+ @Test
+ public void testGetBrokerClusterConfig() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerClusterAclConfigResponseHeader.class);
+ GetBrokerClusterAclConfigResponseBody body = new GetBrokerClusterAclConfigResponseBody();
+ body.setGlobalWhiteAddrs(Collections.singletonList("1.1.1.1"));
+ body.setPlainAccessConfigs(Collections.singletonList(new PlainAccessConfig()));
+ response.setBody(body.encode());
+ response.makeCustomHeaderToNet();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ AclConfig aclConfig = mqClientAPI.getBrokerClusterConfig(brokerAddr, 10000);
+ assertThat(aclConfig.getPlainAccessConfigs()).size().isGreaterThan(0);
+ assertThat(aclConfig.getGlobalWhiteAddrs()).size().isGreaterThan(0);
+ }
+
+ @Test
+ public void testViewMessage() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) throws Exception {
+ RemotingCommand request = mock.getArgument(1);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ MessageExt message = new MessageExt();
+ message.setQueueId(0);
+ message.setFlag(12);
+ message.setQueueOffset(0L);
+ message.setCommitLogOffset(100L);
+ message.setSysFlag(0);
+ message.setBornTimestamp(System.currentTimeMillis());
+ message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+ message.setStoreTimestamp(System.currentTimeMillis());
+ message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+ message.setBody("body".getBytes());
+ message.setTopic(topic);
+ message.putUserProperty("key", "value");
+ response.setBody(MessageDecoder.encode(message, false));
+ response.makeCustomHeaderToNet();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ MessageExt messageExt = mqClientAPI.viewMessage(brokerAddr, 100L, 10000);
+ assertThat(messageExt.getTopic()).isEqualTo(topic);
+ }
+
+ @Test
+ public void testSearchOffset() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
+ final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader();
+ responseHeader.setOffset(100L);
+ response.makeCustomHeaderToNet();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ long offset = mqClientAPI.searchOffset(brokerAddr, topic, 0, System.currentTimeMillis() - 1000, 10000);
+ assertThat(offset).isEqualTo(100L);
+ }
+
+ @Test
+ public void testGetMaxOffset() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
+ final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
+ responseHeader.setOffset(100L);
+ response.makeCustomHeaderToNet();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ long offset = mqClientAPI.getMaxOffset(brokerAddr, topic, 0, 10000);
+ assertThat(offset).isEqualTo(100L);
+ }
+
+ @Test
+ public void testGetMinOffset() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
+ final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
+ responseHeader.setOffset(100L);
+ response.makeCustomHeaderToNet();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ long offset = mqClientAPI.getMinOffset(brokerAddr, topic, 0, 10000);
+ assertThat(offset).isEqualTo(100L);
+ }
+
+ @Test
+ public void testGetEarliestMsgStoretime() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
+ final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
+ responseHeader.setTimestamp(100L);
+ response.makeCustomHeaderToNet();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ long t = mqClientAPI.getEarliestMsgStoretime(brokerAddr, topic, 0, 10000);
+ assertThat(t).isEqualTo(100L);
+ }
+
+ @Test
+ public void testQueryConsumerOffset() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ final RemotingCommand response =
+ RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
+ final QueryConsumerOffsetResponseHeader responseHeader =
+ (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
+ responseHeader.setOffset(100L);
+ response.makeCustomHeaderToNet();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ long t = mqClientAPI.queryConsumerOffset(brokerAddr, new QueryConsumerOffsetRequestHeader(), 1000);
+ assertThat(t).isEqualTo(100L);
+ }
+
+ @Test
+ public void testUpdateConsumerOffset() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ final RemotingCommand response =
+ RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
+ response.makeCustomHeaderToNet();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ mqClientAPI.updateConsumerOffset(brokerAddr, new UpdateConsumerOffsetRequestHeader(), 1000);
+ }
+
+ @Test
+ public void testGetConsumerIdListByGroup() throws Exception {
+ doAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock mock) {
+ RemotingCommand request = mock.getArgument(1);
+
+ final RemotingCommand response =
+ RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
+ GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
+ body.setConsumerIdList(Collections.singletonList("consumer1"));
+ response.setBody(body.encode());
+ response.makeCustomHeaderToNet();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+ List<String> consumerIdList = mqClientAPI.getConsumerIdListByGroup(brokerAddr, group, 10000);
+ assertThat(consumerIdList).size().isGreaterThan(0);
+ }
+
private RemotingCommand createResumeSuccessResponse(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
@@ -363,7 +845,7 @@ public class MQClientAPIImplTest {
return response;
}
- private RemotingCommand createSuccessResponse(RemotingCommand request) {
+ private RemotingCommand createSendMessageSuccessResponse(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
index d4f5812..702f7b1 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
@@ -18,21 +18,66 @@
package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
+import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.hook.FilterMessageContext;
+import org.apache.rocketmq.client.hook.FilterMessageHook;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerImplTest {
+ @Mock
+ private DefaultMQPushConsumer defaultMQPushConsumer;
@Rule
public ExpectedException thrown = ExpectedException.none();
+ @Before
+ public void setUp() throws Exception {
+ when(defaultMQPushConsumer.getConsumerGroup()).thenReturn("test_group");
+ when(defaultMQPushConsumer.getMessageModel()).thenReturn(MessageModel.CLUSTERING);
+ when(defaultMQPushConsumer.getConsumeFromWhere()).thenReturn(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ when(defaultMQPushConsumer.getConsumeTimestamp()).thenReturn(UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)));
+ when(defaultMQPushConsumer.getAllocateMessageQueueStrategy()).thenReturn(new AllocateMessageQueueAveragely());
+ when(defaultMQPushConsumer.getConsumeThreadMin()).thenReturn(20);
+ when(defaultMQPushConsumer.getConsumeThreadMax()).thenReturn(30);
+ when(defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()).thenReturn(2000);
+ when(defaultMQPushConsumer.getPullThresholdForQueue()).thenReturn(1000);
+ when(defaultMQPushConsumer.getPullThresholdForTopic()).thenReturn(-1);
+ when(defaultMQPushConsumer.getPullThresholdSizeForQueue()).thenReturn(100);
+ when(defaultMQPushConsumer.getPullThresholdSizeForTopic()).thenReturn(-1);
+ when(defaultMQPushConsumer.getConsumeMessageBatchMaxSize()).thenReturn(1);
+ when(defaultMQPushConsumer.getPullBatchSize()).thenReturn(32);
+ when(defaultMQPushConsumer.getPopInvisibleTime()).thenReturn(60000L);
+ when(defaultMQPushConsumer.getPopBatchNums()).thenReturn(32);
+ when(defaultMQPushConsumer.getClientIP()).thenReturn("127.0.0.1");
+ when(defaultMQPushConsumer.getInstanceName()).thenReturn("test_instance");
+ when(defaultMQPushConsumer.buildMQClientId()).thenCallRealMethod();
+ ClientConfig clientConfig = new ClientConfig();
+ when(defaultMQPushConsumer.cloneClientConfig()).thenReturn(clientConfig);
+ when(defaultMQPushConsumer.getConsumeTimeout()).thenReturn(15L);
+ }
+
@Test
public void checkConfigTest() throws MQClientException {
@@ -58,4 +103,51 @@ public class DefaultMQPushConsumerImplTest {
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(consumer, null);
defaultMQPushConsumerImpl.start();
}
+
+ @Test
+ public void testHook() throws Exception {
+ DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(defaultMQPushConsumer, null);
+ defaultMQPushConsumerImpl.registerConsumeMessageHook(new ConsumeMessageHook() {
+ @Override public String hookName() {
+ return "consumerHook";
+ }
+
+ @Override public void consumeMessageBefore(ConsumeMessageContext context) {
+ assertThat(context).isNotNull();
+ }
+
+ @Override public void consumeMessageAfter(ConsumeMessageContext context) {
+ assertThat(context).isNotNull();
+ }
+ });
+ defaultMQPushConsumerImpl.registerFilterMessageHook(new FilterMessageHook() {
+ @Override public String hookName() {
+ return "filterHook";
+ }
+
+ @Override public void filterMessage(FilterMessageContext context) {
+ assertThat(context).isNotNull();
+ }
+ });
+ defaultMQPushConsumerImpl.executeHookBefore(new ConsumeMessageContext());
+ defaultMQPushConsumerImpl.executeHookAfter(new ConsumeMessageContext());
+ }
+
+ @Test
+ public void testPush() throws Exception {
+ when(defaultMQPushConsumer.getMessageListener()).thenReturn(new MessageListenerConcurrently() {
+ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ assertThat(msgs).size().isGreaterThan(0);
+ assertThat(context).isNotNull();
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(defaultMQPushConsumer, null);
+ try {
+ defaultMQPushConsumerImpl.start();
+ } finally {
+ defaultMQPushConsumerImpl.shutdown();
+ }
+ }
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
index 796a394..b7a42e1 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
@@ -22,23 +22,28 @@ import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -51,6 +56,7 @@ public class RebalancePushImplTest {
private OffsetStore offsetStore;
private String consumerGroup = "CID_RebalancePushImplTest";
private String topic = "TopicA";
+ private final String brokerName = "BrokerA";
@Test
public void testMessageQueueChanged_CountThreshold() {
@@ -88,16 +94,15 @@ public class RebalancePushImplTest {
rebalancePush.subscriptionInner.putIfAbsent(topic, new SubscriptionData());
+ try {
+ when(mqClientInstance.queryAssignment(anyString(), anyString(), anyString(), any(MessageModel.class), anyInt())).thenThrow(new RemotingTimeoutException("unsupported"));
+ } catch (RemotingException ignored) {
+ } catch (InterruptedException ignored) {
+ } catch (MQBrokerException ignored) {
+ }
when(mqClientInstance.findConsumerIdList(anyString(), anyString())).thenReturn(Collections.singletonList(consumerGroup));
when(mqClientInstance.getClientId()).thenReturn(consumerGroup);
when(defaultMQPushConsumer.getOffsetStore()).thenReturn(offsetStore);
-
- doAnswer(new Answer() {
- @Override
- public Object answer(final InvocationOnMock invocation) throws Throwable {
- return null;
- }
- }).when(defaultMQPushConsumer).executePullRequestImmediately(any(PullRequest.class));
}
@Test
@@ -134,8 +139,8 @@ public class RebalancePushImplTest {
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
- allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
- allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
+ allocateResultSet.add(new MessageQueue(topic, brokerName, 0));
+ allocateResultSet.add(new MessageQueue(topic, brokerName, 1));
doRebalanceForcibly(rebalancePush, allocateResultSet);
defaultMQPushConsumer.setConsumeMessageService(new ConsumeMessageConcurrentlyService(defaultMQPushConsumer, null));
@@ -160,4 +165,22 @@ public class RebalancePushImplTest {
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
}
+
+ @Test
+ public void testDoRebalancePull() throws Exception {
+ RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
+ new AllocateMessageQueueAveragely(), mqClientInstance, defaultMQPushConsumer);
+ rebalancePush.getSubscriptionInner().putIfAbsent(topic, new SubscriptionData());
+ rebalancePush.subscriptionInner.putIfAbsent(topic, new SubscriptionData());
+
+ when(mqClientInstance.getClientId()).thenReturn(consumerGroup);
+ when(defaultMQPushConsumer.getOffsetStore()).thenReturn(offsetStore);
+ doNothing().when(defaultMQPushConsumer).executePullRequestLater(any(PullRequest.class), anyLong());
+ MessageQueueAssignment queueAssignment = new MessageQueueAssignment();
+ queueAssignment.setMode(MessageRequestMode.PULL);
+ queueAssignment.setMessageQueue(new MessageQueue(topic, brokerName, 0));
+ when(mqClientInstance.queryAssignment(anyString(), anyString(), anyString(), any(MessageModel.class), anyInt())).thenReturn(Collections.singleton(queueAssignment));
+
+ assertThat(rebalancePush.doRebalance(false)).isTrue();
+ }
}
\ No newline at end of file
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index a3457e1..95582c5 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -180,4 +180,5 @@ public class MQClientInstanceTest {
flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class));
assertThat(flag).isTrue();
}
+
}
\ No newline at end of file
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index e1771b9..8902396 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -48,7 +48,6 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.junit.After;
import org.junit.Before;
@@ -56,9 +55,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
@@ -260,7 +257,7 @@ public class DefaultMQProducerTest {
}
};
- List<Message> msgs = new ArrayList<>();
+ List<Message> msgs = new ArrayList<Message>();
for (int i = 0; i < 5; i++) {
Message message = new Message();
message.setTopic("test");
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index 5ea517c..7b75e6b 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -53,13 +53,11 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -73,7 +71,6 @@ import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -89,7 +86,6 @@ import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultMQPushConsumerImpl.class)
-@PowerMockIgnore("javax.management.*")
public class DefaultMQConsumerWithTraceTest {
private String consumerGroup;
private String consumerGroupNormal;
diff --git a/client/src/test/resources/org/powermock/extensions/configuration.properties b/client/src/test/resources/org/powermock/extensions/configuration.properties
new file mode 100644
index 0000000..6389eff
--- /dev/null
+++ b/client/src/test/resources/org/powermock/extensions/configuration.properties
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+powermock.global-ignore=javax.management.*
\ No newline at end of file