You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:39 UTC

[41/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-220] Add IT test for Filter By SQL 92, closes apache/incubator-rocketmq#114

[ROCKETMQ-220] Add IT test for Filter By SQL 92, closes
apache/incubator-rocketmq#114


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/605103ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/605103ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/605103ef

Branch: refs/heads/develop
Commit: 605103efd70cac652a0a65a447428c41459ffaa1
Parents: 982770b
Author: dongeforever <zh...@yeah.net>
Authored: Thu Jun 8 11:12:26 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/broker/BrokerController.java       |  2 +-
 .../rocketmq/example/filter/SqlConsumer.java    |  1 -
 .../test/client/rmq/RMQSqlConsumer.java         | 42 +++++++++++
 .../rocketmq/test/factory/ConsumerFactory.java  | 12 ++++
 .../test/listener/AbstractListener.java         | 22 ++++++
 .../rocketmq/test/base/IntegrationTestBase.java |  1 +
 .../client/consumer/filter/SqlFilterIT.java     | 74 ++++++++++++++++++++
 7 files changed, 152 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index bacd25c..1416ebf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -421,7 +421,7 @@ public class BrokerController {
 
         this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
 
         /**
          * ConsumerManageProcessor

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
index 9a3b813..52c2474 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
@@ -31,7 +31,6 @@ public class SqlConsumer {
 
     public static void main(String[] args) {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
-
         try {
             consumer.subscribe("TopicTest",
                 MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
new file mode 100644
index 0000000..cb0210f
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.test.listener.AbstractListener;
+
+public class RMQSqlConsumer extends RMQNormalConsumer {
+    private static Logger logger = Logger.getLogger(RMQSqlConsumer.class);
+    private MessageSelector selector;
+    public RMQSqlConsumer(String nsAddr, String topic, MessageSelector selector,
+        String consumerGroup, AbstractListener listener) {
+        super(nsAddr, topic, "*", consumerGroup, listener);
+        this.selector = selector;
+    }
+
+    @Override
+    public void create() {
+        super.create();
+        try {
+            consumer.subscribe(topic, selector);
+        } catch (Exception e) {
+            logger.error("Subscribe Sql Errored", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
index b5b3fdd..7dd747f 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
@@ -17,8 +17,10 @@
 
 package org.apache.rocketmq.test.factory;
 
+import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
 import org.apache.rocketmq.test.listener.AbstractListener;
 
 public class ConsumerFactory {
@@ -42,4 +44,14 @@ public class ConsumerFactory {
         consumer.start();
         return consumer;
     }
+
+    public static RMQSqlConsumer getRMQSqlConsumer(String nsAddr, String consumerGroup,
+        String topic, MessageSelector selector,
+        AbstractListener listner) {
+        RMQSqlConsumer consumer = new RMQSqlConsumer(nsAddr, topic, selector,
+            consumerGroup, listner);
+        consumer.create();
+        consumer.start();
+        return consumer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
index 974434a..14da397 100644
--- a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
+++ b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
@@ -95,6 +95,28 @@ public class AbstractListener extends MQCollector implements MessageListener {
         return sendMsgs;
     }
 
+    public long waitForMessageConsume(int size,
+        int timeoutMills) {
+
+        long curTime = System.currentTimeMillis();
+        while (true) {
+            if (msgBodys.getDataSize() >= size) {
+                break;
+            }
+            if (System.currentTimeMillis() - curTime >= timeoutMills) {
+                logger.error(String.format("timeout but  [%s]  not recv all send messages!",
+                    listnerName));
+                break;
+            } else {
+                logger.info(String.format("[%s] still [%s] msg not recv!", listnerName,
+                    size - msgBodys.getDataSize()));
+                TestUtil.waitForMonment(500);
+            }
+        }
+
+        return msgBodys.getDataSize();
+    }
+
     public void waitForMessageConsume(Map<Object, Object> sendMsgIndex, int timeoutMills) {
         Collection<Object> notRecvMsgs = waitForMessageConsume(sendMsgIndex.keySet(), timeoutMills);
         for (Object object : notRecvMsgs) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 9805eba..07af7aa 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -127,6 +127,7 @@ public class IntegrationTestBase {
         brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
         brokerConfig.setBrokerIP1("127.0.0.1");
         brokerConfig.setNamesrvAddr(nsAddr);
+        brokerConfig.setEnablePropertyFilter(true);
         storeConfig.setStorePathRootDir(baseDir);
         storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
         storeConfig.setHaListenPort(8000 + random.nextInt(1000));

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
new file mode 100644
index 0000000..7eef2ab
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.filter;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
+import org.apache.rocketmq.test.factory.ConsumerFactory;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class SqlFilterIT extends BaseConf {
+    private static Logger logger = Logger.getLogger(SqlFilterIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testFilterConsumer() throws Exception {
+        int msgSize = 16;
+
+        String group = initConsumerGroup();
+        MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))");
+        RMQSqlConsumer consumer = ConsumerFactory.getRMQSqlConsumer(nsAddr, group, topic, selector, new RMQNormalListner(group + "_1"));
+        Thread.sleep(3000);
+        producer.send("TagA", msgSize);
+        producer.send("TagB", msgSize);
+        producer.send("TagC", msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(msgSize * 2, consumeTime);
+        assertThat(producer.getAllMsgBody())
+            .containsAllIn(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()));
+
+        assertThat(consumer.getListner().getAllMsgBody().size()).isEqualTo(msgSize * 2);
+    }
+}