You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vs...@apache.org on 2017/08/29 15:32:33 UTC

incubator-rocketmq git commit: Author: vsair

Repository: incubator-rocketmq
Updated Branches:
  refs/heads/develop 1b853e81a -> ed4821ae0


Author: vsair <li...@gmail.com>

Closes #160 from vsair/ROCKETMQ-284.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/ed4821ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/ed4821ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/ed4821ae

Branch: refs/heads/develop
Commit: ed4821ae0d0108bcc02098f59ccafe18f0405db4
Parents: 1b853e8
Author: vsair <li...@gmail.com>
Authored: Tue Aug 29 23:32:21 2017 +0800
Committer: vsair <vs...@apache.org>
Committed: Tue Aug 29 23:32:21 2017 +0800

----------------------------------------------------------------------
 .../broker/filter/ExpressionMessageFilter.java  |  2 +-
 .../filter/MessageStoreWithFilterTest.java      | 85 ++++++++++----------
 .../org/apache/rocketmq/store/ConsumeQueue.java |  2 +-
 .../apache/rocketmq/store/ConsumeQueueExt.java  |  2 +-
 .../rocketmq/store/DefaultMessageStore.java     | 40 ++++-----
 5 files changed, 67 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ed4821ae/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
index 2f94de2..64c28ec 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
@@ -70,7 +70,7 @@ public class ExpressionMessageFilter implements MessageFilter {
         // by tags code.
         if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
 
-            if (tagsCode == null || tagsCode < 0L) {
+            if (tagsCode == null) {
                 return true;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ed4821ae/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
index 7978942..e544d90 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
@@ -24,12 +24,14 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.store.CommitLogDispatcher;
+import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageArrivingListener;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -77,24 +79,17 @@ public class MessageStoreWithFilterTest {
         try {
             StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
         } catch (UnknownHostException e) {
-            e.printStackTrace();
         }
         try {
             BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
         } catch (UnknownHostException e) {
-            e.printStackTrace();
         }
     }
 
     @Before
-    public void init() {
+    public void init() throws Exception {
         filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);
-        try {
-            master = gen(filterManager);
-        } catch (Exception e) {
-            e.printStackTrace();
-            assertThat(true).isFalse();
-        }
+        master = gen(filterManager);
     }
 
     @After
@@ -107,7 +102,7 @@ public class MessageStoreWithFilterTest {
     public MessageExtBrokerInner buildMessage() {
         MessageExtBrokerInner msg = new MessageExtBrokerInner();
         msg.setTopic(topic);
-        msg.setTags("TAG1");
+        msg.setTags(System.currentTimeMillis() + "TAG");
         msg.setKeys("Hello");
         msg.setBody(msgBody);
         msg.setKeys(String.valueOf(System.currentTimeMillis()));
@@ -125,7 +120,7 @@ public class MessageStoreWithFilterTest {
     }
 
     public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize,
-        boolean enableCqExt, int cqExtFileSize) {
+                                               boolean enableCqExt, int cqExtFileSize) {
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
         messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
@@ -155,9 +150,7 @@ public class MessageStoreWithFilterTest {
             new MessageArrivingListener() {
                 @Override
                 public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
-                    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
-//                    System.out.println(String.format("Msg coming: %s, %d, %d, %d",
-//                        topic, queueId, logicOffset, tagsCode));
+                                     long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
                 }
             }
             , brokerConfig);
@@ -166,8 +159,6 @@ public class MessageStoreWithFilterTest {
             @Override
             public void dispatch(DispatchRequest request) {
                 try {
-//                    System.out.println(String.format("offset:%d, bitMap:%s", request.getCommitLogOffset(),
-//                        BitsArray.create(request.getBitMap()).toString()));
                 } catch (Throwable e) {
                     e.printStackTrace();
                 }
@@ -183,7 +174,7 @@ public class MessageStoreWithFilterTest {
     }
 
     protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount,
-        int msgCountPerTopic) throws Exception {
+                                                 int msgCountPerTopic) throws Exception {
         List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>();
         for (int i = 0; i < topicCount; i++) {
             String realTopic = topic + i;
@@ -229,22 +220,10 @@ public class MessageStoreWithFilterTest {
     }
 
     @Test
-    public void testGetMessage_withFilterBitMapAndConsumerChanged() {
-        List<MessageExtBrokerInner> msgs = null;
-        try {
-            msgs = putMsg(master, topicCount, msgPerTopic);
-        } catch (Exception e) {
-            e.printStackTrace();
-            assertThat(true).isFalse();
-        }
+    public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception {
+        List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);
 
-        // sleep to wait for consume queue has been constructed.
-        try {
-            Thread.sleep(200);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-            assertThat(true).isFalse();
-        }
+        Thread.sleep(200);
 
         // reset consumer;
         String topic = "topic" + 0;
@@ -303,16 +282,10 @@ public class MessageStoreWithFilterTest {
     }
 
     @Test
-    public void testGetMessage_withFilterBitMap() {
-        List<MessageExtBrokerInner> msgs = null;
-        try {
-            msgs = putMsg(master, topicCount, msgPerTopic);
-            // sleep to wait for consume queue has been constructed.
-            Thread.sleep(200);
-        } catch (Exception e) {
-            e.printStackTrace();
-            assertThat(true).isFalse();
-        }
+    public void testGetMessage_withFilterBitMap() throws Exception {
+        List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);
+
+        Thread.sleep(100);
 
         for (int i = 0; i < topicCount; i++) {
             String realTopic = topic + i;
@@ -369,4 +342,32 @@ public class MessageStoreWithFilterTest {
             }
         }
     }
+
+    @Test
+    public void testGetMessage_withFilter_checkTagsCode() throws Exception {
+        putMsg(master, topicCount, msgPerTopic);
+
+        Thread.sleep(200);
+
+        for (int i = 0; i < topicCount; i++) {
+            String realTopic = topic + i;
+
+            GetMessageResult getMessageResult = master.getMessage("test", realTopic, queueId, 0, 10000,
+                new MessageFilter() {
+                    @Override
+                    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
+                        if (tagsCode != null && tagsCode <= ConsumeQueueExt.MAX_ADDR) {
+                            return false;
+                        }
+                        return true;
+                    }
+
+                    @Override
+                    public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
+                        return true;
+                    }
+                });
+            assertThat(getMessageResult.getMessageCount()).isEqualTo(msgPerTopic);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ed4821ae/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 379162d..0bf0aa9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -569,6 +569,6 @@ public class ConsumeQueue {
      * Check {@code tagsCode} is address of extend file or tags code.
      */
     public boolean isExtAddr(long tagsCode) {
-        return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode);
+        return ConsumeQueueExt.isExtAddr(tagsCode);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ed4821ae/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
index a118cde..aeb2803 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
@@ -95,7 +95,7 @@ public class ConsumeQueueExt {
      * Just test {@code address} is less than 0.
      * </p>
      */
-    public boolean isExtAddr(final long address) {
+    public static boolean isExtAddr(final long address) {
         return address <= MAX_ADDR;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ed4821ae/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 95a017a..ffa8dbc 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -16,23 +16,6 @@
  */
 package org.apache.rocketmq.store;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceThread;
@@ -56,6 +39,24 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 import static org.apache.rocketmq.store.config.BrokerRole.SLAVE;
 
 public class DefaultMessageStore implements MessageStore {
@@ -487,7 +488,7 @@ public class DefaultMessageStore implements MessageStore {
                                 break;
                             }
 
-                            boolean extRet = false;
+                            boolean extRet = false, isTagsCodeLegal = true;
                             if (consumeQueue.isExtAddr(tagsCode)) {
                                 extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
                                 if (extRet) {
@@ -496,11 +497,12 @@ public class DefaultMessageStore implements MessageStore {
                                     // can't find ext content.Client will filter messages by tag also.
                                     log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
                                         tagsCode, offsetPy, sizePy, topic, group);
+                                    isTagsCodeLegal = false;
                                 }
                             }
 
                             if (messageFilter != null
-                                && !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) {
+                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                 if (getResult.getBufferTotalSize() == 0) {
                                     status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                 }