You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vs...@apache.org on 2017/08/29 15:32:33 UTC
incubator-rocketmq git commit: Author: vsair
Repository: incubator-rocketmq
Updated Branches:
refs/heads/develop 1b853e81a -> ed4821ae0
Author: vsair <li...@gmail.com>
Closes #160 from vsair/ROCKETMQ-284.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/ed4821ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/ed4821ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/ed4821ae
Branch: refs/heads/develop
Commit: ed4821ae0d0108bcc02098f59ccafe18f0405db4
Parents: 1b853e8
Author: vsair <li...@gmail.com>
Authored: Tue Aug 29 23:32:21 2017 +0800
Committer: vsair <vs...@apache.org>
Committed: Tue Aug 29 23:32:21 2017 +0800
----------------------------------------------------------------------
.../broker/filter/ExpressionMessageFilter.java | 2 +-
.../filter/MessageStoreWithFilterTest.java | 85 ++++++++++----------
.../org/apache/rocketmq/store/ConsumeQueue.java | 2 +-
.../apache/rocketmq/store/ConsumeQueueExt.java | 2 +-
.../rocketmq/store/DefaultMessageStore.java | 40 ++++-----
5 files changed, 67 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ed4821ae/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
index 2f94de2..64c28ec 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
@@ -70,7 +70,7 @@ public class ExpressionMessageFilter implements MessageFilter {
// by tags code.
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
- if (tagsCode == null || tagsCode < 0L) {
+ if (tagsCode == null) {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ed4821ae/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
index 7978942..e544d90 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
@@ -24,12 +24,14 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.CommitLogDispatcher;
+import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -77,24 +79,17 @@ public class MessageStoreWithFilterTest {
try {
StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
} catch (UnknownHostException e) {
- e.printStackTrace();
}
try {
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
} catch (UnknownHostException e) {
- e.printStackTrace();
}
}
@Before
- public void init() {
+ public void init() throws Exception {
filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);
- try {
- master = gen(filterManager);
- } catch (Exception e) {
- e.printStackTrace();
- assertThat(true).isFalse();
- }
+ master = gen(filterManager);
}
@After
@@ -107,7 +102,7 @@ public class MessageStoreWithFilterTest {
public MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic(topic);
- msg.setTags("TAG1");
+ msg.setTags(System.currentTimeMillis() + "TAG");
msg.setKeys("Hello");
msg.setBody(msgBody);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
@@ -125,7 +120,7 @@ public class MessageStoreWithFilterTest {
}
public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize,
- boolean enableCqExt, int cqExtFileSize) {
+ boolean enableCqExt, int cqExtFileSize) {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
@@ -155,9 +150,7 @@ public class MessageStoreWithFilterTest {
new MessageArrivingListener() {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
- long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
-// System.out.println(String.format("Msg coming: %s, %d, %d, %d",
-// topic, queueId, logicOffset, tagsCode));
+ long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
}
}
, brokerConfig);
@@ -166,8 +159,6 @@ public class MessageStoreWithFilterTest {
@Override
public void dispatch(DispatchRequest request) {
try {
-// System.out.println(String.format("offset:%d, bitMap:%s", request.getCommitLogOffset(),
-// BitsArray.create(request.getBitMap()).toString()));
} catch (Throwable e) {
e.printStackTrace();
}
@@ -183,7 +174,7 @@ public class MessageStoreWithFilterTest {
}
protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount,
- int msgCountPerTopic) throws Exception {
+ int msgCountPerTopic) throws Exception {
List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>();
for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i;
@@ -229,22 +220,10 @@ public class MessageStoreWithFilterTest {
}
@Test
- public void testGetMessage_withFilterBitMapAndConsumerChanged() {
- List<MessageExtBrokerInner> msgs = null;
- try {
- msgs = putMsg(master, topicCount, msgPerTopic);
- } catch (Exception e) {
- e.printStackTrace();
- assertThat(true).isFalse();
- }
+ public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception {
+ List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);
- // sleep to wait for consume queue has been constructed.
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- assertThat(true).isFalse();
- }
+ Thread.sleep(200);
// reset consumer;
String topic = "topic" + 0;
@@ -303,16 +282,10 @@ public class MessageStoreWithFilterTest {
}
@Test
- public void testGetMessage_withFilterBitMap() {
- List<MessageExtBrokerInner> msgs = null;
- try {
- msgs = putMsg(master, topicCount, msgPerTopic);
- // sleep to wait for consume queue has been constructed.
- Thread.sleep(200);
- } catch (Exception e) {
- e.printStackTrace();
- assertThat(true).isFalse();
- }
+ public void testGetMessage_withFilterBitMap() throws Exception {
+ List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);
+
+ Thread.sleep(100);
for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i;
@@ -369,4 +342,32 @@ public class MessageStoreWithFilterTest {
}
}
}
+
+ @Test
+ public void testGetMessage_withFilter_checkTagsCode() throws Exception {
+ putMsg(master, topicCount, msgPerTopic);
+
+ Thread.sleep(200);
+
+ for (int i = 0; i < topicCount; i++) {
+ String realTopic = topic + i;
+
+ GetMessageResult getMessageResult = master.getMessage("test", realTopic, queueId, 0, 10000,
+ new MessageFilter() {
+ @Override
+ public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
+ if (tagsCode != null && tagsCode <= ConsumeQueueExt.MAX_ADDR) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
+ return true;
+ }
+ });
+ assertThat(getMessageResult.getMessageCount()).isEqualTo(msgPerTopic);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ed4821ae/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 379162d..0bf0aa9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -569,6 +569,6 @@ public class ConsumeQueue {
* Check {@code tagsCode} is address of extend file or tags code.
*/
public boolean isExtAddr(long tagsCode) {
- return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode);
+ return ConsumeQueueExt.isExtAddr(tagsCode);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ed4821ae/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
index a118cde..aeb2803 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
@@ -95,7 +95,7 @@ public class ConsumeQueueExt {
* Just test {@code address} is less than 0.
* </p>
*/
- public boolean isExtAddr(final long address) {
+ public static boolean isExtAddr(final long address) {
return address <= MAX_ADDR;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ed4821ae/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 95a017a..ffa8dbc 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -16,23 +16,6 @@
*/
package org.apache.rocketmq.store;
-import java.io.File;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
@@ -56,6 +39,24 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
import static org.apache.rocketmq.store.config.BrokerRole.SLAVE;
public class DefaultMessageStore implements MessageStore {
@@ -487,7 +488,7 @@ public class DefaultMessageStore implements MessageStore {
break;
}
- boolean extRet = false;
+ boolean extRet = false, isTagsCodeLegal = true;
if (consumeQueue.isExtAddr(tagsCode)) {
extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) {
@@ -496,11 +497,12 @@ public class DefaultMessageStore implements MessageStore {
// can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
tagsCode, offsetPy, sizePy, topic, group);
+ isTagsCodeLegal = false;
}
}
if (messageFilter != null
- && !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) {
+ && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}