You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2017/10/19 03:57:55 UTC

[GitHub] Jaskey closed pull request #66: [ROCKETMQ-106] Add flow control on topic level

Jaskey closed pull request #66: [ROCKETMQ-106] Add flow control on topic level
URL: https://github.com/apache/rocketmq/pull/66
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 9c9b59ef..cf55a951 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -166,11 +166,18 @@
     private int consumeConcurrentlyMaxSpan = 2000;
 
     /**
-     * Flow control threshold
+     * Flow control threshold on queue level
      */
     private int pullThresholdForQueue = 1000;
 
     /**
+     * Introduced since 4.1.x
+     * Flow control threshold on topic level.The value should be greater or equals than pullThresholdForQueue otherwise flow control for topic will always come first before flow control on queue.
+     * By default, it is set to max value of Integer.MAX_VALUE, which means flow control for topic is disable.
+     */
+    private int pullThresholdForTopic = Integer.MAX_VALUE;
+
+    /**
      * Message pull Interval
      */
     private long pullInterval = 0;
@@ -401,6 +408,14 @@ public void setPullThresholdForQueue(int pullThresholdForQueue) {
         this.pullThresholdForQueue = pullThresholdForQueue;
     }
 
+    public int getPullThresholdForTopic() {
+        return pullThresholdForTopic;
+    }
+
+    public void setPullThresholdForTopic(int pullThresholdForTopic) {
+        this.pullThresholdForTopic = pullThresholdForTopic;
+    }
+
     public Map<String, String> getSubscription() {
         return subscription;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 9bf34be8..dbcca59d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -106,8 +106,9 @@
     private MessageListener messageListenerInner;
     private OffsetStore offsetStore;
     private ConsumeMessageService consumeMessageService;
-    private long flowControlTimes1 = 0;
-    private long flowControlTimes2 = 0;
+    private long topicFlowControlTimes = 0;
+    private long queueFlowControlTimes = 0;
+    private long queueMaxSpanFlowControlTimes = 0;
 
     public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
         this.defaultMQPushConsumer = defaultMQPushConsumer;
@@ -219,13 +220,37 @@ public void pullMessage(final PullRequest pullRequest) {
             return;
         }
 
+        //flow control for topic
+        if (this.defaultMQPushConsumer.getPullThresholdForTopic() != Integer.MAX_VALUE) {
+            Map<MessageQueue, ProcessQueue> allProcessQMap = this.getRebalanceImpl().getProcessQueueTable();
+            Iterator<Entry<MessageQueue, ProcessQueue>> it = allProcessQMap.entrySet().iterator();
+            long sizeOfAllQueue = 0;
+            //pick the relative process queues and calculate size
+            while (it.hasNext()) {
+                Entry<MessageQueue, ProcessQueue> entry = it.next();
+                if (pullRequest.getMessageQueue().getTopic().equals(entry.getKey().getTopic())) {
+                    sizeOfAllQueue += entry.getValue().getMsgCount().get();
+                }
+            }
+            if (sizeOfAllQueue > this.defaultMQPushConsumer.getPullThresholdForTopic()) {
+                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+                if ((topicFlowControlTimes++ % 1000) == 0) {
+                    log.warn(
+                        "the consumer message topic buffer is full, so do flow control, minOffset={}, maxOffset={}, sizeOfAllQueue={}, pullRequest={}, flowControlTimes={}",
+                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), sizeOfAllQueue, pullRequest, topicFlowControlTimes);
+                }
+                return;
+            }
+        }
+
+        //flow control for queue
         long size = processQueue.getMsgCount().get();
         if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
-            if ((flowControlTimes1++ % 1000) == 0) {
+            if ((queueFlowControlTimes++ % 1000) == 0) {
                 log.warn(
-                    "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
-                    processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
+                    "the consumer message queue buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
+                    processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, queueFlowControlTimes);
             }
             return;
         }
@@ -233,11 +258,11 @@ public void pullMessage(final PullRequest pullRequest) {
         if (!this.consumeOrderly) {
             if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
-                if ((flowControlTimes2++ % 1000) == 0) {
+                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                     log.warn(
                         "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                         processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
-                        pullRequest, flowControlTimes2);
+                        pullRequest, queueMaxSpanFlowControlTimes);
                 }
                 return;
             }
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index cdf1d780..1ad71fc8 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -19,10 +19,15 @@
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -31,6 +36,7 @@
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
@@ -52,6 +58,7 @@
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -109,7 +116,6 @@ public void init() throws Exception {
         field.setAccessible(true);
         field.set(pushConsumerImpl, mQClientFactory);
 
-
         field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
         field.setAccessible(true);
         field.set(mQClientFactory, mQClientAPIImpl);
@@ -125,27 +131,45 @@ public void init() throws Exception {
         when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
             anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
             .thenAnswer(new Answer<Object>() {
-            @Override public Object answer(InvocationOnMock mock) throws Throwable {
-                PullMessageRequestHeader requestHeader = mock.getArgument(1);
-                MessageClientExt messageClientExt = new MessageClientExt();
-                messageClientExt.setTopic(topic);
-                messageClientExt.setQueueId(0);
-                messageClientExt.setMsgId("123");
-                messageClientExt.setBody(new byte[] {'a'});
-                messageClientExt.setOffsetMsgId("234");
-                messageClientExt.setBornHost(new InetSocketAddress(8080));
-                messageClientExt.setStoreHost(new InetSocketAddress(8080));
-                PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
-                ((PullCallback)mock.getArgument(4)).onSuccess(pullResult);
-                return pullResult;
-            }
-        });
+                @Override public Object answer(InvocationOnMock mock) throws Throwable {
+                    PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                    MessageClientExt messageClientExt = new MessageClientExt();
+                    messageClientExt.setTopic(topic);
+                    messageClientExt.setQueueId(((PullMessageRequestHeader) mock.getArgument(1)).getQueueId());
+                    messageClientExt.setMsgId("123");
+                    messageClientExt.setBody(new byte[] {'a'});
+                    messageClientExt.setOffsetMsgId("234");
+                    messageClientExt.setBornHost(new InetSocketAddress(8080));
+                    messageClientExt.setStoreHost(new InetSocketAddress(8080));
+                    messageClientExt.setQueueOffset(((PullMessageRequestHeader) mock.getArgument(1)).getQueueOffset());
+                    PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+                    ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
+                    return pullResult;
+                }
+            });
 
         doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
-        doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
-        Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
-        messageQueueSet.add(createPullRequest().getMessageQueue());
+        doReturn(new ArrayList<String>(Collections.singletonList(mQClientFactory.getClientId()))).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
+        //START: mock allocating 4 queue
+        final Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+        messageQueueSet.add(createPullRequest(0).getMessageQueue());
+        messageQueueSet.add(createPullRequest(1).getMessageQueue());
+        messageQueueSet.add(createPullRequest(2).getMessageQueue());
+        messageQueueSet.add(createPullRequest(3).getMessageQueue());
+        pushConsumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() {
+            @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID,
+                List<MessageQueue> mqAll, List<String> cidAll) {
+                return new ArrayList<MessageQueue>(messageQueueSet);
+            }
+
+            @Override public String getName() {
+                return "TEST";
+            }
+        });
+        rebalancePushImpl.getTopicSubscribeInfoTable().putIfAbsent(topic, messageQueueSet);
         pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
+        //END: mock allocating 4 queue
+
         doReturn(123L).when(rebalancePushImpl).computePullFromWhere(any(MessageQueue.class));
     }
 
@@ -166,15 +190,55 @@ public void testPullMessage_Success() throws InterruptedException, RemotingExcep
                 return null;
             }
         }));
-
-        PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
-        pullMessageService.executePullRequestImmediately(createPullRequest());
+        pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
         countDownLatch.await();
         assertThat(messageExts[0].getTopic()).isEqualTo(topic);
         assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
     }
 
     @Test
+    public void testTopicFlowControl() throws Exception {
+        final int flowControl = 200;
+        final int pullBatchSize = 1;
+        pushConsumer.setPullInterval(0);
+        pushConsumer.setPullBatchSize(pullBatchSize);
+        pushConsumer.setPullThresholdForQueue(flowControl / 2);//4 queue, each flow control in 100
+        pushConsumer.setPullThresholdForTopic(flowControl);//topic flow control in 200
+        pushConsumer.setConsumeThreadMin(200);
+        pushConsumer.setConsumeThreadMax(500);
+        pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
+            @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                ConsumeConcurrentlyContext context) {
+                for (MessageExt msg : msgs) {
+                    try {
+                        Thread.sleep(2000);//block some time to make it accumulated
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        }));
+        pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
+        Thread.sleep(3000);//spend some time to consume
+
+        //START  : check flow control on topic level
+        Map<MessageQueue, ProcessQueue> allProcessQMap = pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable();
+        Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = allProcessQMap.entrySet().iterator();
+        long sizeOfAllQueue = 0;
+        //pick the relative process queues and calculate size
+        while (it.hasNext()) {
+            Map.Entry<MessageQueue, ProcessQueue> entry = it.next();
+            if (topic.equals(entry.getKey().getTopic())) {
+                sizeOfAllQueue += entry.getValue().getMsgCount().get();
+            }
+        }
+        Assert.assertTrue("topic flow control does not work as expected, actual = " + sizeOfAllQueue + " flowControl = " + flowControl, sizeOfAllQueue <= flowControl + pullBatchSize);
+        //END  : check flow control on topic level
+
+    }
+
+    @Test
     public void testPullMessage_SuccessWithOrderlyService() throws Exception {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         final MessageExt[] messageExts = new MessageExt[1];
@@ -200,13 +264,17 @@ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderly
     }
 
     private PullRequest createPullRequest() {
+        return createPullRequest(0);
+    }
+
+    private PullRequest createPullRequest(int queueId) {
         PullRequest pullRequest = new PullRequest();
         pullRequest.setConsumerGroup(consumerGroup);
         pullRequest.setNextOffset(1024);
 
         MessageQueue messageQueue = new MessageQueue();
         messageQueue.setBrokerName(brokerName);
-        messageQueue.setQueueId(0);
+        messageQueue.setQueueId(queueId);
         messageQueue.setTopic(topic);
         pullRequest.setMessageQueue(messageQueue);
         ProcessQueue processQueue = new ProcessQueue();
@@ -217,7 +285,8 @@ private PullRequest createPullRequest() {
         return pullRequest;
     }
 
-    private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, List<MessageExt> messageExtList) throws Exception {
+    private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
+        List<MessageExt> messageExtList) throws Exception {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         for (MessageExt messageExt : messageExtList) {
             outputStream.write(MessageDecoder.encode(messageExt, false));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services