You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/24 07:48:58 UTC

[rocketmq] branch develop updated: polish filter example

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e204376  polish filter example
     new 11f6edd  Merge pull request #620 from RongtongJin/polish_filter_example
e204376 is described below

commit e20437657aadcda22f4f9cc745caf100887ab455
Author: Jin Rongtong <79...@qq.com>
AuthorDate: Sat Dec 22 11:18:42 2018 +0800

    polish filter example
---
 .../{SqlConsumer.java => SqlFilterConsumer.java}   | 29 ++++------
 .../{Producer.java => SqlFilterProducer.java}      | 36 ++++++------
 .../rocketmq/example/filter/SqlProducer.java       | 67 ----------------------
 .../{Consumer.java => TagFilterConsumer.java}      | 12 +---
 .../{Producer.java => TagFilterProducer.java}      | 32 +++++------
 5 files changed, 47 insertions(+), 129 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterConsumer.java
similarity index 74%
rename from example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
rename to example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterConsumer.java
index c41c9c1..8dd6d20 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterConsumer.java
@@ -17,28 +17,24 @@
 
 package org.apache.rocketmq.example.filter;
 
+import java.util.List;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 
-import java.util.List;
+public class SqlFilterConsumer {
+
+    public static void main(String[] args) throws Exception {
 
-public class SqlConsumer {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
 
-    public static void main(String[] args) {
-        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
-        try {
-            consumer.subscribe("TopicTest",
-                MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
-                    "and (a is not null and a between 0  3)"));
-        } catch (MQClientException e) {
-            e.printStackTrace();
-            return;
-        }
+        // Don't forget to set enablePropertyFilter=true in broker
+        consumer.subscribe("SqlFilterTest",
+            MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
+                "and (a is not null and a between 0 and 3)"));
 
         consumer.registerMessageListener(new MessageListenerConcurrently() {
 
@@ -50,12 +46,7 @@ public class SqlConsumer {
             }
         });
 
-        try {
-            consumer.start();
-        } catch (MQClientException e) {
-            e.printStackTrace();
-            return;
-        }
+        consumer.start();
         System.out.printf("Consumer Started.%n");
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterProducer.java
similarity index 59%
copy from example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
copy to example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterProducer.java
index 2a0da65..0018270 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterProducer.java
@@ -14,33 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.example.filter;
 
-import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 
-public class Producer {
-    public static void main(String[] args) throws MQClientException, InterruptedException {
-        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+public class SqlFilterProducer {
+
+    public static void main(String[] args) throws Exception {
+
+        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+
         producer.start();
 
-        try {
-            for (int i = 0; i < 6000000; i++) {
-                Message msg = new Message("TopicFilter7",
-                    "TagA",
-                    "OrderID001",
-                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
-
-                msg.putUserProperty("SequenceId", String.valueOf(i));
-                SendResult sendResult = producer.send(msg);
-                System.out.printf("%s%n", sendResult);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
+        String[] tags = new String[] {"TagA", "TagB", "TagC"};
+
+        for (int i = 0; i < 10; i++) {
+            Message msg = new Message("SqlFilterTest",
+                tags[i % tags.length],
+                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
+            );
+            msg.putUserProperty("a", String.valueOf(i));
+
+            SendResult sendResult = producer.send(msg);
+            System.out.printf("%s%n", sendResult);
         }
+
         producer.shutdown();
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java
deleted file mode 100644
index 3f3a0e6..0000000
--- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.example.filter;
-
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-
-public class SqlProducer {
-
-    public static void main(String[] args) {
-        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
-        try {
-            producer.start();
-        } catch (MQClientException e) {
-            e.printStackTrace();
-            return;
-        }
-
-        for (int i = 0; i < 10; i++) {
-            try {
-                String tag;
-                int div = i % 3;
-                if (div == 0) {
-                    tag = "TagA";
-                } else if (div == 1) {
-                    tag = "TagB";
-                } else {
-                    tag = "TagC";
-                }
-                Message msg = new Message("TopicTest",
-                    tag,
-                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
-                );
-                msg.putUserProperty("a", String.valueOf(i));
-
-                SendResult sendResult = producer.send(msg);
-                System.out.printf("%s%n", sendResult);
-            } catch (Exception e) {
-                e.printStackTrace();
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e1) {
-                    e1.printStackTrace();
-                }
-            }
-        }
-        producer.shutdown();
-    }
-}
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/TagFilterConsumer.java
similarity index 81%
rename from example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
rename to example/src/main/java/org/apache/rocketmq/example/filter/TagFilterConsumer.java
index bb491ac..ba3723c 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/TagFilterConsumer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.example.filter;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -24,20 +23,15 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageExt;
 
-public class Consumer {
+public class TagFilterConsumer {
 
     public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
-        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
 
-        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-        File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
 
-        String filterCode = MixAll.file2String(classFile);
-        consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl",
-            filterCode);
+        consumer.subscribe("TagFilterTest", "TagA || TagC");
 
         consumer.registerMessageListener(new MessageListenerConcurrently() {
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java b/example/src/main/java/org/apache/rocketmq/example/filter/TagFilterProducer.java
similarity index 59%
rename from example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
rename to example/src/main/java/org/apache/rocketmq/example/filter/TagFilterProducer.java
index 2a0da65..b0a9e2d 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/TagFilterProducer.java
@@ -16,31 +16,29 @@
  */
 package org.apache.rocketmq.example.filter;
 
-import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 
-public class Producer {
-    public static void main(String[] args) throws MQClientException, InterruptedException {
-        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+public class TagFilterProducer {
+
+    public static void main(String[] args) throws Exception {
+
+        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
         producer.start();
 
-        try {
-            for (int i = 0; i < 6000000; i++) {
-                Message msg = new Message("TopicFilter7",
-                    "TagA",
-                    "OrderID001",
-                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
-
-                msg.putUserProperty("SequenceId", String.valueOf(i));
-                SendResult sendResult = producer.send(msg);
-                System.out.printf("%s%n", sendResult);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
+        String[] tags = new String[] {"TagA", "TagB", "TagC"};
+
+        for (int i = 0; i < 60; i++) {
+            Message msg = new Message("TagFilterTest",
+                tags[i % tags.length],
+                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+            SendResult sendResult = producer.send(msg);
+            System.out.printf("%s%n", sendResult);
         }
+
         producer.shutdown();
     }
 }