You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/24 07:48:58 UTC
[rocketmq] branch develop updated: polish filter example
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e204376 polish filter example
new 11f6edd Merge pull request #620 from RongtongJin/polish_filter_example
e204376 is described below
commit e20437657aadcda22f4f9cc745caf100887ab455
Author: Jin Rongtong <79...@qq.com>
AuthorDate: Sat Dec 22 11:18:42 2018 +0800
polish filter example
---
.../{SqlConsumer.java => SqlFilterConsumer.java} | 29 ++++------
.../{Producer.java => SqlFilterProducer.java} | 36 ++++++------
.../rocketmq/example/filter/SqlProducer.java | 67 ----------------------
.../{Consumer.java => TagFilterConsumer.java} | 12 +---
.../{Producer.java => TagFilterProducer.java} | 32 +++++------
5 files changed, 47 insertions(+), 129 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterConsumer.java
similarity index 74%
rename from example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
rename to example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterConsumer.java
index c41c9c1..8dd6d20 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterConsumer.java
@@ -17,28 +17,24 @@
package org.apache.rocketmq.example.filter;
+import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
-import java.util.List;
+public class SqlFilterConsumer {
+
+ public static void main(String[] args) throws Exception {
-public class SqlConsumer {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
- public static void main(String[] args) {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
- try {
- consumer.subscribe("TopicTest",
- MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
- "and (a is not null and a between 0 3)"));
- } catch (MQClientException e) {
- e.printStackTrace();
- return;
- }
+ // Don't forget to set enablePropertyFilter=true in broker
+ consumer.subscribe("SqlFilterTest",
+ MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
+ "and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@@ -50,12 +46,7 @@ public class SqlConsumer {
}
});
- try {
- consumer.start();
- } catch (MQClientException e) {
- e.printStackTrace();
- return;
- }
+ consumer.start();
System.out.printf("Consumer Started.%n");
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterProducer.java
similarity index 59%
copy from example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
copy to example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterProducer.java
index 2a0da65..0018270 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterProducer.java
@@ -14,33 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.example.filter;
-import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-public class Producer {
- public static void main(String[] args) throws MQClientException, InterruptedException {
- DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+public class SqlFilterProducer {
+
+ public static void main(String[] args) throws Exception {
+
+ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+
producer.start();
- try {
- for (int i = 0; i < 6000000; i++) {
- Message msg = new Message("TopicFilter7",
- "TagA",
- "OrderID001",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
-
- msg.putUserProperty("SequenceId", String.valueOf(i));
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
- }
- } catch (Exception e) {
- e.printStackTrace();
+ String[] tags = new String[] {"TagA", "TagB", "TagC"};
+
+ for (int i = 0; i < 10; i++) {
+ Message msg = new Message("SqlFilterTest",
+ tags[i % tags.length],
+ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
+ );
+ msg.putUserProperty("a", String.valueOf(i));
+
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("%s%n", sendResult);
}
+
producer.shutdown();
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java
deleted file mode 100644
index 3f3a0e6..0000000
--- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.example.filter;
-
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-
-public class SqlProducer {
-
- public static void main(String[] args) {
- DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
- try {
- producer.start();
- } catch (MQClientException e) {
- e.printStackTrace();
- return;
- }
-
- for (int i = 0; i < 10; i++) {
- try {
- String tag;
- int div = i % 3;
- if (div == 0) {
- tag = "TagA";
- } else if (div == 1) {
- tag = "TagB";
- } else {
- tag = "TagC";
- }
- Message msg = new Message("TopicTest",
- tag,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
- );
- msg.putUserProperty("a", String.valueOf(i));
-
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- }
- }
- producer.shutdown();
- }
-}
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/TagFilterConsumer.java
similarity index 81%
rename from example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
rename to example/src/main/java/org/apache/rocketmq/example/filter/TagFilterConsumer.java
index bb491ac..ba3723c 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/TagFilterConsumer.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.example.filter;
-import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -24,20 +23,15 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt;
-public class Consumer {
+public class TagFilterConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
- String filterCode = MixAll.file2String(classFile);
- consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl",
- filterCode);
+ consumer.subscribe("TagFilterTest", "TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java b/example/src/main/java/org/apache/rocketmq/example/filter/TagFilterProducer.java
similarity index 59%
rename from example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
rename to example/src/main/java/org/apache/rocketmq/example/filter/TagFilterProducer.java
index 2a0da65..b0a9e2d 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/TagFilterProducer.java
@@ -16,31 +16,29 @@
*/
package org.apache.rocketmq.example.filter;
-import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-public class Producer {
- public static void main(String[] args) throws MQClientException, InterruptedException {
- DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+public class TagFilterProducer {
+
+ public static void main(String[] args) throws Exception {
+
+ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
- try {
- for (int i = 0; i < 6000000; i++) {
- Message msg = new Message("TopicFilter7",
- "TagA",
- "OrderID001",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
-
- msg.putUserProperty("SequenceId", String.valueOf(i));
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
- }
- } catch (Exception e) {
- e.printStackTrace();
+ String[] tags = new String[] {"TagA", "TagB", "TagC"};
+
+ for (int i = 0; i < 60; i++) {
+ Message msg = new Message("TagFilterTest",
+ tags[i % tags.length],
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("%s%n", sendResult);
}
+
producer.shutdown();
}
}