You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/08/11 06:37:53 UTC

incubator-rocketmq git commit: [ROCKETMQ-231]Wrong Pull result sizebugfix

Repository: incubator-rocketmq
Updated Branches:
  refs/heads/develop 98bd03245 -> bcc65e547


[ROCKETMQ-231]Wrong Pull result sizebugfix

Author: lindzh <li...@163.com>

Closes #126 from lindzh/fix_consumer_pull_msg_size.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/bcc65e54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/bcc65e54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/bcc65e54

Branch: refs/heads/develop
Commit: bcc65e5471d9821e0edb90f6f637e3034ed1da44
Parents: 98bd032
Author: lindzh <li...@163.com>
Authored: Fri Aug 11 14:36:53 2017 +0800
Committer: vongosling <vo...@apache.org>
Committed: Fri Aug 11 14:36:53 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/store/DefaultMessageStore.java     |  6 +--
 .../rocketmq/store/DefaultMessageStoreTest.java | 53 ++++++++++++++++----
 .../org/apache/rocketmq/test/base/BaseConf.java |  2 +-
 .../rocketmq/test/base/IntegrationTestBase.java | 10 ++--
 .../broadcast/order/OrderMsgBroadCastIT.java    |  8 +--
 5 files changed, 60 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/bcc65e54/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index b5bac3f..7b5ac45 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1111,7 +1111,7 @@ public class DefaultMessageStore implements MessageStore {
             return false;
         }
 
-        if ((messageTotal + 1) >= maxMsgNums) {
+        if (maxMsgNums <= messageTotal) {
             return true;
         }
 
@@ -1120,7 +1120,7 @@ public class DefaultMessageStore implements MessageStore {
                 return true;
             }
 
-            if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
+            if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) {
                 return true;
             }
         } else {
@@ -1128,7 +1128,7 @@ public class DefaultMessageStore implements MessageStore {
                 return true;
             }
 
-            if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
+            if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) {
                 return true;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/bcc65e54/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 75f1de9..273cc21 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -22,9 +22,11 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -45,19 +47,22 @@ public class DefaultMessageStoreTest {
         BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
     }
 
+    public MessageStore buildMessageStore() throws Exception {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10);
+        messageStoreConfig.setMaxHashSlotNum(10000);
+        messageStoreConfig.setMaxIndexNum(100 * 100);
+        messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
+        return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig());
+    }
+
     @Test
     public void testWriteAndRead() throws Exception {
         long totalMsgs = 100;
         QUEUE_TOTAL = 1;
         MessageBody = StoreMessage.getBytes();
-
-        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
-        messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
-        messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
-        messageStoreConfig.setMaxHashSlotNum(100);
-        messageStoreConfig.setMaxIndexNum(100 * 10);
-        MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());
-
+        MessageStore master = buildMessageStore();
         boolean load = master.load();
         assertTrue(load);
 
@@ -86,7 +91,7 @@ public class DefaultMessageStoreTest {
         msg.setBody(MessageBody);
         msg.setKeys(String.valueOf(System.currentTimeMillis()));
         msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
-        msg.setSysFlag(4);
+        msg.setSysFlag(0);
         msg.setBornTimestamp(System.currentTimeMillis());
         msg.setStoreHost(StoreHost);
         msg.setBornHost(BornHost);
@@ -123,6 +128,36 @@ public class DefaultMessageStoreTest {
         }
     }
 
+    @Test
+    public void testPullSize() throws Exception {
+        MessageStore messageStore = buildMessageStore();
+        boolean load = messageStore.load();
+        assertTrue(load);
+        messageStore.start();
+        String topic = "pullSizeTopic";
+
+        for (int i = 0; i < 32; i++) {
+            MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+            messageExtBrokerInner.setTopic(topic);
+            messageExtBrokerInner.setQueueId(0);
+            PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+        }
+        //wait for consume queue build
+        Thread.sleep(100);
+        String group = "simple";
+        GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null);
+        assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32);
+
+
+        GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null);
+        assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20);
+
+        GetMessageResult getMessageResult45 = messageStore.getMessage(group, topic, 0, 0, 10, null);
+        assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);
+
+        messageStore.shutdown();
+    }
+
     private class MyMessageArrivingListener implements MessageArrivingListener {
         @Override
         public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/bcc65e54/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 92f77b8..8516779 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -41,7 +41,7 @@ public class BaseConf {
     protected static String clusterName;
     protected static int brokerNum;
     protected static int waitTime = 5;
-    protected static int consumeTime = 1 * 60 * 1000;
+    protected static int consumeTime = 5 * 60 * 1000;
     protected static NamesrvController namesrvController;
     protected static BrokerController brokerController1;
     protected static BrokerController brokerController2;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/bcc65e54/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 07af7aa..61e98e2 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -148,17 +148,17 @@ public class IntegrationTestBase {
         return brokerController;
     }
 
-    public static boolean initTopic(String topic, String nsAddr, String clusterName) {
+    public static boolean initTopic(String topic, String nsAddr, String clusterName,int queueNumbers){
         long startTime = System.currentTimeMillis();
         boolean createResult;
 
         while (true) {
-            createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, 8);
+            createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, queueNumbers);
             if (createResult) {
                 break;
             } else if (System.currentTimeMillis() - startTime > topicCreateTime) {
                 Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic,
-                    System.currentTimeMillis() - startTime));
+                        System.currentTimeMillis() - startTime));
                 break;
             } else {
                 TestUtils.waitForMoment(500);
@@ -169,6 +169,10 @@ public class IntegrationTestBase {
         return createResult;
     }
 
+    public static boolean initTopic(String topic, String nsAddr, String clusterName) {
+        return initTopic(topic, nsAddr, clusterName,8);
+    }
+
     public static void deleteFile(File file) {
         if (!file.exists()) {
             return;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/bcc65e54/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
index 2c9abc0..47cde74 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
@@ -38,6 +38,8 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT {
     private RMQNormalProducer producer = null;
     private String topic = null;
 
+    private int broadcastConsumeTime = 1 * 60 * 1000;
+
     @Before
     public void setUp() {
         topic = initTopic();
@@ -60,12 +62,12 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT {
             consumer1.getConsumerGroup(), topic, "*", new RMQOrderListener());
         TestUtils.waitForSeconds(waitTime);
 
+
         List<MessageQueue> mqs = producer.getMessageQueue();
         MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
         producer.send(mqMsgs.getMsgsWithMQ());
-
-        consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
-        consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
+        consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), broadcastConsumeTime);
+        consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), broadcastConsumeTime);
 
         assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListener()).getMsgs()))
             .isEqualTo(true);