You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:39 UTC
[41/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-220] Add IT
test for Filter By SQL 92, closes apache/incubator-rocketmq#114
[ROCKETMQ-220] Add IT test for Filter By SQL 92, closes
apache/incubator-rocketmq#114
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/605103ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/605103ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/605103ef
Branch: refs/heads/develop
Commit: 605103efd70cac652a0a65a447428c41459ffaa1
Parents: 982770b
Author: dongeforever <zh...@yeah.net>
Authored: Thu Jun 8 11:12:26 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800
----------------------------------------------------------------------
.../rocketmq/broker/BrokerController.java | 2 +-
.../rocketmq/example/filter/SqlConsumer.java | 1 -
.../test/client/rmq/RMQSqlConsumer.java | 42 +++++++++++
.../rocketmq/test/factory/ConsumerFactory.java | 12 ++++
.../test/listener/AbstractListener.java | 22 ++++++
.../rocketmq/test/base/IntegrationTestBase.java | 1 +
.../client/consumer/filter/SqlFilterIT.java | 74 ++++++++++++++++++++
7 files changed, 152 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index bacd25c..1416ebf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -421,7 +421,7 @@ public class BrokerController {
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
- this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
/**
* ConsumerManageProcessor
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
index 9a3b813..52c2474 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
@@ -31,7 +31,6 @@ public class SqlConsumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
-
try {
consumer.subscribe("TopicTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
new file mode 100644
index 0000000..cb0210f
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.test.listener.AbstractListener;
+
+public class RMQSqlConsumer extends RMQNormalConsumer {
+ private static Logger logger = Logger.getLogger(RMQSqlConsumer.class);
+ private MessageSelector selector;
+ public RMQSqlConsumer(String nsAddr, String topic, MessageSelector selector,
+ String consumerGroup, AbstractListener listener) {
+ super(nsAddr, topic, "*", consumerGroup, listener);
+ this.selector = selector;
+ }
+
+ @Override
+ public void create() {
+ super.create();
+ try {
+ consumer.subscribe(topic, selector);
+ } catch (Exception e) {
+ logger.error("Subscribe Sql Errored", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
index b5b3fdd..7dd747f 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
@@ -17,8 +17,10 @@
package org.apache.rocketmq.test.factory;
+import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
import org.apache.rocketmq.test.listener.AbstractListener;
public class ConsumerFactory {
@@ -42,4 +44,14 @@ public class ConsumerFactory {
consumer.start();
return consumer;
}
+
+ public static RMQSqlConsumer getRMQSqlConsumer(String nsAddr, String consumerGroup,
+ String topic, MessageSelector selector,
+ AbstractListener listner) {
+ RMQSqlConsumer consumer = new RMQSqlConsumer(nsAddr, topic, selector,
+ consumerGroup, listner);
+ consumer.create();
+ consumer.start();
+ return consumer;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
index 974434a..14da397 100644
--- a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
+++ b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
@@ -95,6 +95,28 @@ public class AbstractListener extends MQCollector implements MessageListener {
return sendMsgs;
}
+ public long waitForMessageConsume(int size,
+ int timeoutMills) {
+
+ long curTime = System.currentTimeMillis();
+ while (true) {
+ if (msgBodys.getDataSize() >= size) {
+ break;
+ }
+ if (System.currentTimeMillis() - curTime >= timeoutMills) {
+ logger.error(String.format("timeout but [%s] not recv all send messages!",
+ listnerName));
+ break;
+ } else {
+ logger.info(String.format("[%s] still [%s] msg not recv!", listnerName,
+ size - msgBodys.getDataSize()));
+ TestUtil.waitForMonment(500);
+ }
+ }
+
+ return msgBodys.getDataSize();
+ }
+
public void waitForMessageConsume(Map<Object, Object> sendMsgIndex, int timeoutMills) {
Collection<Object> notRecvMsgs = waitForMessageConsume(sendMsgIndex.keySet(), timeoutMills);
for (Object object : notRecvMsgs) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 9805eba..07af7aa 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -127,6 +127,7 @@ public class IntegrationTestBase {
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr);
+ brokerConfig.setEnablePropertyFilter(true);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(8000 + random.nextInt(1000));
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
new file mode 100644
index 0000000..7eef2ab
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.filter;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
+import org.apache.rocketmq.test.factory.ConsumerFactory;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class SqlFilterIT extends BaseConf {
+ private static Logger logger = Logger.getLogger(SqlFilterIT.class);
+ private RMQNormalProducer producer = null;
+ private String topic = null;
+
+ @Before
+ public void setUp() {
+ topic = initTopic();
+ logger.info(String.format("use topic: %s;", topic));
+ producer = getProducer(nsAddr, topic);
+ }
+
+ @After
+ public void tearDown() {
+ super.shutDown();
+ }
+
+ @Test
+ public void testFilterConsumer() throws Exception {
+ int msgSize = 16;
+
+ String group = initConsumerGroup();
+ MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))");
+ RMQSqlConsumer consumer = ConsumerFactory.getRMQSqlConsumer(nsAddr, group, topic, selector, new RMQNormalListner(group + "_1"));
+ Thread.sleep(3000);
+ producer.send("TagA", msgSize);
+ producer.send("TagB", msgSize);
+ producer.send("TagC", msgSize);
+ Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size());
+ consumer.getListner().waitForMessageConsume(msgSize * 2, consumeTime);
+ assertThat(producer.getAllMsgBody())
+ .containsAllIn(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+ consumer.getListner().getAllMsgBody()));
+
+ assertThat(consumer.getListner().getAllMsgBody().size()).isEqualTo(msgSize * 2);
+ }
+}