You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/08/11 06:37:53 UTC
incubator-rocketmq git commit: [ROCKETMQ-231]Wrong Pull result
sizebugfix
Repository: incubator-rocketmq
Updated Branches:
refs/heads/develop 98bd03245 -> bcc65e547
[ROCKETMQ-231]Wrong Pull result sizebugfix
Author: lindzh <li...@163.com>
Closes #126 from lindzh/fix_consumer_pull_msg_size.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/bcc65e54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/bcc65e54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/bcc65e54
Branch: refs/heads/develop
Commit: bcc65e5471d9821e0edb90f6f637e3034ed1da44
Parents: 98bd032
Author: lindzh <li...@163.com>
Authored: Fri Aug 11 14:36:53 2017 +0800
Committer: vongosling <vo...@apache.org>
Committed: Fri Aug 11 14:36:53 2017 +0800
----------------------------------------------------------------------
.../rocketmq/store/DefaultMessageStore.java | 6 +--
.../rocketmq/store/DefaultMessageStoreTest.java | 53 ++++++++++++++++----
.../org/apache/rocketmq/test/base/BaseConf.java | 2 +-
.../rocketmq/test/base/IntegrationTestBase.java | 10 ++--
.../broadcast/order/OrderMsgBroadCastIT.java | 8 +--
5 files changed, 60 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/bcc65e54/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index b5bac3f..7b5ac45 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1111,7 +1111,7 @@ public class DefaultMessageStore implements MessageStore {
return false;
}
- if ((messageTotal + 1) >= maxMsgNums) {
+ if (maxMsgNums <= messageTotal) {
return true;
}
@@ -1120,7 +1120,7 @@ public class DefaultMessageStore implements MessageStore {
return true;
}
- if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
+ if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) {
return true;
}
} else {
@@ -1128,7 +1128,7 @@ public class DefaultMessageStore implements MessageStore {
return true;
}
- if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
+ if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) {
return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/bcc65e54/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 75f1de9..273cc21 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -22,9 +22,11 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Before;
import org.junit.Test;
@@ -45,19 +47,22 @@ public class DefaultMessageStoreTest {
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
}
+ public MessageStore buildMessageStore() throws Exception {
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
+ messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10);
+ messageStoreConfig.setMaxHashSlotNum(10000);
+ messageStoreConfig.setMaxIndexNum(100 * 100);
+ messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
+ return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig());
+ }
+
@Test
public void testWriteAndRead() throws Exception {
long totalMsgs = 100;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
-
- MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
- messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
- messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
- messageStoreConfig.setMaxHashSlotNum(100);
- messageStoreConfig.setMaxIndexNum(100 * 10);
- MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());
-
+ MessageStore master = buildMessageStore();
boolean load = master.load();
assertTrue(load);
@@ -86,7 +91,7 @@ public class DefaultMessageStoreTest {
msg.setBody(MessageBody);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
- msg.setSysFlag(4);
+ msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost);
msg.setBornHost(BornHost);
@@ -123,6 +128,36 @@ public class DefaultMessageStoreTest {
}
}
+ @Test
+ public void testPullSize() throws Exception {
+ MessageStore messageStore = buildMessageStore();
+ boolean load = messageStore.load();
+ assertTrue(load);
+ messageStore.start();
+ String topic = "pullSizeTopic";
+
+ for (int i = 0; i < 32; i++) {
+ MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+ messageExtBrokerInner.setTopic(topic);
+ messageExtBrokerInner.setQueueId(0);
+ PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ }
+ //wait for consume queue build
+ Thread.sleep(100);
+ String group = "simple";
+ GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null);
+ assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32);
+
+
+ GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null);
+ assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20);
+
+ GetMessageResult getMessageResult45 = messageStore.getMessage(group, topic, 0, 0, 10, null);
+ assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);
+
+ messageStore.shutdown();
+ }
+
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/bcc65e54/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 92f77b8..8516779 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -41,7 +41,7 @@ public class BaseConf {
protected static String clusterName;
protected static int brokerNum;
protected static int waitTime = 5;
- protected static int consumeTime = 1 * 60 * 1000;
+ protected static int consumeTime = 5 * 60 * 1000;
protected static NamesrvController namesrvController;
protected static BrokerController brokerController1;
protected static BrokerController brokerController2;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/bcc65e54/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 07af7aa..61e98e2 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -148,17 +148,17 @@ public class IntegrationTestBase {
return brokerController;
}
- public static boolean initTopic(String topic, String nsAddr, String clusterName) {
+ public static boolean initTopic(String topic, String nsAddr, String clusterName,int queueNumbers){
long startTime = System.currentTimeMillis();
boolean createResult;
while (true) {
- createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, 8);
+ createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, queueNumbers);
if (createResult) {
break;
} else if (System.currentTimeMillis() - startTime > topicCreateTime) {
Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic,
- System.currentTimeMillis() - startTime));
+ System.currentTimeMillis() - startTime));
break;
} else {
TestUtils.waitForMoment(500);
@@ -169,6 +169,10 @@ public class IntegrationTestBase {
return createResult;
}
+ public static boolean initTopic(String topic, String nsAddr, String clusterName) {
+ return initTopic(topic, nsAddr, clusterName,8);
+ }
+
public static void deleteFile(File file) {
if (!file.exists()) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/bcc65e54/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
index 2c9abc0..47cde74 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
@@ -38,6 +38,8 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT {
private RMQNormalProducer producer = null;
private String topic = null;
+ private int broadcastConsumeTime = 1 * 60 * 1000;
+
@Before
public void setUp() {
topic = initTopic();
@@ -60,12 +62,12 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT {
consumer1.getConsumerGroup(), topic, "*", new RMQOrderListener());
TestUtils.waitForSeconds(waitTime);
+
List<MessageQueue> mqs = producer.getMessageQueue();
MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
producer.send(mqMsgs.getMsgsWithMQ());
-
- consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
- consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
+ consumer1.getListener().waitForMessageConsume(producer.getAllMsgBody(), broadcastConsumeTime);
+ consumer2.getListener().waitForMessageConsume(producer.getAllMsgBody(), broadcastConsumeTime);
assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListener()).getMsgs()))
.isEqualTo(true);