You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/06/06 02:53:27 UTC

[rocketmq] branch develop updated: [ISSUE #4323] tracemessage example add the default NamesrvAddr (#4353)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1db9adb3e [ISSUE #4323] tracemessage example add the default NamesrvAddr (#4353)
1db9adb3e is described below

commit 1db9adb3e31733841a423299437b0251dc333b63
Author: 李晓双 Li Xiao Shuang <64...@qq.com>
AuthorDate: Mon Jun 6 10:53:21 2022 +0800

    [ISSUE #4323] tracemessage example add the default NamesrvAddr (#4353)
    
    * tracemessage example add the default NamesrvAddr
    
    * update annotation
---
 .../example/tracemessage/OpenTracingProducer.java  | 30 ++++++++++++------
 .../tracemessage/OpenTracingPushConsumer.java      | 36 +++++++++++-----------
 .../OpenTracingTransactionProducer.java            | 29 +++++++++++------
 .../example/tracemessage/TraceProducer.java        | 22 ++++++++++---
 .../example/tracemessage/TracePushConsumer.java    | 25 +++++++--------
 5 files changed, 88 insertions(+), 54 deletions(-)

diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java
index cd9ae2792..0c41b5b6f 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java
@@ -29,19 +29,29 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 
 public class OpenTracingProducer {
+
+    public static final String PRODUCER_GROUP = "ProducerGroupName";
+    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+    public static final String TOPIC = "TopicTest";
+    public static final String TAG = "TagA";
+    public static final String KEY = "OrderID188";
+
     public static void main(String[] args) throws MQClientException {
 
         Tracer tracer = initTracer();
 
-        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
+
+        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+//        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
         producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer));
         producer.start();
 
         try {
-            Message msg = new Message("TopicTest",
-                    "TagA",
-                    "OrderID188",
-                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+            Message msg = new Message(TOPIC,
+                TAG,
+                KEY,
+                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
             SendResult sendResult = producer.send(msg);
             System.out.printf("%s%n", sendResult);
 
@@ -54,14 +64,14 @@ public class OpenTracingProducer {
 
     private static Tracer initTracer() {
         Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
-                .withType(ConstSampler.TYPE)
-                .withParam(1);
+            .withType(ConstSampler.TYPE)
+            .withParam(1);
         Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
-                .withLogSpans(true);
+            .withLogSpans(true);
 
         Configuration config = new Configuration("rocketmq")
-                .withSampler(samplerConfig)
-                .withReporter(reporterConfig);
+            .withSampler(samplerConfig)
+            .withReporter(reporterConfig);
         GlobalTracer.registerIfAbsent(config.getTracer());
         return config.getTracer();
     }
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
index 1d5d8a273..72295f366 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
@@ -22,34 +22,34 @@ import io.jaegertracing.internal.samplers.ConstSampler;
 import io.opentracing.Tracer;
 import io.opentracing.util.GlobalTracer;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.trace.hook.ConsumeMessageOpenTracingHookImpl;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.MessageExt;
-
-import java.util.List;
 
 public class OpenTracingPushConsumer {
+
+    public static final String CONSUMER_GROUP = "CID_JODIE_1";
+    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+    public static final String TOPIC = "TopicTest";
+
     public static void main(String[] args) throws InterruptedException, MQClientException {
         Tracer tracer = initTracer();
 
-        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
+
+        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+//        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
         consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
 
-        consumer.subscribe("TopicTest", "*");
+        consumer.subscribe(TOPIC, "*");
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
         consumer.setConsumeTimestamp("20181109221800");
-        consumer.registerMessageListener(new MessageListenerConcurrently() {
-
-            @Override
-            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
-                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
-                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-            }
+        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         });
         consumer.start();
         System.out.printf("Consumer Started.%n");
@@ -57,14 +57,14 @@ public class OpenTracingPushConsumer {
 
     private static Tracer initTracer() {
         Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
-                .withType(ConstSampler.TYPE)
-                .withParam(1);
+            .withType(ConstSampler.TYPE)
+            .withParam(1);
         Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
-                .withLogSpans(true);
+            .withLogSpans(true);
 
         Configuration config = new Configuration("rocketmq")
-                .withSampler(samplerConfig)
-                .withReporter(reporterConfig);
+            .withSampler(samplerConfig)
+            .withReporter(reporterConfig);
         GlobalTracer.registerIfAbsent(config.getTracer());
         return config.getTracer();
     }
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
index 514f3ceb7..dc05c72b1 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
@@ -35,10 +35,21 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 import java.io.UnsupportedEncodingException;
 
 public class OpenTracingTransactionProducer {
+
+    public static final String PRODUCER_GROUP = "please_rename_unique_group_name";
+    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+    public static final String TOPIC = "TopicTest";
+    public static final String TAG = "Tag";
+    public static final String KEY = "KEY";
+    public static final int MESSAGE_COUNT = 100000;
+
     public static void main(String[] args) throws MQClientException, InterruptedException {
         Tracer tracer = initTracer();
 
-        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
+        TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
+
+        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+//        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
         producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer));
         producer.getDefaultMQProducerImpl().registerEndTransactionHook(new EndTransactionOpenTracingHookImpl(tracer));
 
@@ -56,15 +67,15 @@ public class OpenTracingTransactionProducer {
         producer.start();
 
         try {
-            Message msg = new Message("TopicTest", "Tag", "KEY",
-                    "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
+            Message msg = new Message(TOPIC, TAG, KEY,
+                "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
             SendResult sendResult = producer.sendMessageInTransaction(msg, null);
             System.out.printf("%s%n", sendResult);
         } catch (MQClientException | UnsupportedEncodingException e) {
             e.printStackTrace();
         }
 
-        for (int i = 0; i < 100000; i++) {
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
             Thread.sleep(1000);
         }
         producer.shutdown();
@@ -72,14 +83,14 @@ public class OpenTracingTransactionProducer {
 
     private static Tracer initTracer() {
         Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
-                .withType(ConstSampler.TYPE)
-                .withParam(1);
+            .withType(ConstSampler.TYPE)
+            .withParam(1);
         Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
-                .withLogSpans(true);
+            .withLogSpans(true);
 
         Configuration config = new Configuration("rocketmq")
-                .withSampler(samplerConfig)
-                .withReporter(reporterConfig);
+            .withSampler(samplerConfig)
+            .withReporter(reporterConfig);
         GlobalTracer.registerIfAbsent(config.getTracer());
         return config.getTracer();
     }
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
index fb8e37fd2..add6c4323 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
@@ -24,17 +24,28 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 
 public class TraceProducer {
+
+    public static final String PRODUCER_GROUP = "ProducerGroupName";
+    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+    public static final String TOPIC = "TopicTest";
+    public static final String TAG = "TagA";
+    public static final String KEY = "OrderID188";
+    public static final int MESSAGE_COUNT = 128;
+
     public static void main(String[] args) throws MQClientException, InterruptedException {
 
-        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
+        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true);
+
+        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+//        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
         producer.start();
 
-        for (int i = 0; i < 128; i++)
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
             try {
                 {
-                    Message msg = new Message("TopicTest",
-                        "TagA",
-                        "OrderID188",
+                    Message msg = new Message(TOPIC,
+                        TAG,
+                        KEY,
                         "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                     SendResult sendResult = producer.send(msg);
                     System.out.printf("%s%n", sendResult);
@@ -43,6 +54,7 @@ public class TraceProducer {
             } catch (Exception e) {
                 e.printStackTrace();
             }
+        }
 
         producer.shutdown();
     }
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
index 473351963..a833ee1e4 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
@@ -17,30 +17,31 @@
 
 package org.apache.rocketmq.example.tracemessage;
 
-import java.util.List;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.MessageExt;
 
 public class TracePushConsumer {
+
+    public static final String CONSUMER_GROUP = "ProducerGroupName";
+    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+    public static final String TOPIC = "TopicTest";
+
     public static void main(String[] args) throws InterruptedException, MQClientException {
         // Here,we use the default message track trace topic name
-        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
-        consumer.subscribe("TopicTest", "*");
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, true);
+
+        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+//        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
+        consumer.subscribe(TOPIC, "*");
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         // Wrong time format 2017_0422_221800
         consumer.setConsumeTimestamp("20181109221800");
-        consumer.registerMessageListener(new MessageListenerConcurrently() {
-
-            @Override
-            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
-                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
-                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-            }
+        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         });
         consumer.start();
         System.out.printf("Consumer Started.%n");