You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/06/06 02:53:27 UTC
[rocketmq] branch develop updated: [ISSUE #4323] tracemessage example add the default NamesrvAddr (#4353)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1db9adb3e [ISSUE #4323] tracemessage example add the default NamesrvAddr (#4353)
1db9adb3e is described below
commit 1db9adb3e31733841a423299437b0251dc333b63
Author: 李晓双 Li Xiao Shuang <64...@qq.com>
AuthorDate: Mon Jun 6 10:53:21 2022 +0800
[ISSUE #4323] tracemessage example add the default NamesrvAddr (#4353)
* tracemessage example add the default NamesrvAddr
* update annotation
---
.../example/tracemessage/OpenTracingProducer.java | 30 ++++++++++++------
.../tracemessage/OpenTracingPushConsumer.java | 36 +++++++++++-----------
.../OpenTracingTransactionProducer.java | 29 +++++++++++------
.../example/tracemessage/TraceProducer.java | 22 ++++++++++---
.../example/tracemessage/TracePushConsumer.java | 25 +++++++--------
5 files changed, 88 insertions(+), 54 deletions(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java
index cd9ae2792..0c41b5b6f 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java
@@ -29,19 +29,29 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OpenTracingProducer {
+
+ public static final String PRODUCER_GROUP = "ProducerGroupName";
+ public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+ public static final String TOPIC = "TopicTest";
+ public static final String TAG = "TagA";
+ public static final String KEY = "OrderID188";
+
public static void main(String[] args) throws MQClientException {
Tracer tracer = initTracer();
- DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+ DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
+
+ // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer));
producer.start();
try {
- Message msg = new Message("TopicTest",
- "TagA",
- "OrderID188",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ Message msg = new Message(TOPIC,
+ TAG,
+ KEY,
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
@@ -54,14 +64,14 @@ public class OpenTracingProducer {
private static Tracer initTracer() {
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
- .withType(ConstSampler.TYPE)
- .withParam(1);
+ .withType(ConstSampler.TYPE)
+ .withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
- .withLogSpans(true);
+ .withLogSpans(true);
Configuration config = new Configuration("rocketmq")
- .withSampler(samplerConfig)
- .withReporter(reporterConfig);
+ .withSampler(samplerConfig)
+ .withReporter(reporterConfig);
GlobalTracer.registerIfAbsent(config.getTracer());
return config.getTracer();
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
index 1d5d8a273..72295f366 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
@@ -22,34 +22,34 @@ import io.jaegertracing.internal.samplers.ConstSampler;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageOpenTracingHookImpl;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.MessageExt;
-
-import java.util.List;
public class OpenTracingPushConsumer {
+
+ public static final String CONSUMER_GROUP = "CID_JODIE_1";
+ public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+ public static final String TOPIC = "TopicTest";
+
public static void main(String[] args) throws InterruptedException, MQClientException {
Tracer tracer = initTracer();
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
+
+ // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+// consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
- consumer.subscribe("TopicTest", "*");
+ consumer.subscribe(TOPIC, "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
@@ -57,14 +57,14 @@ public class OpenTracingPushConsumer {
private static Tracer initTracer() {
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
- .withType(ConstSampler.TYPE)
- .withParam(1);
+ .withType(ConstSampler.TYPE)
+ .withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
- .withLogSpans(true);
+ .withLogSpans(true);
Configuration config = new Configuration("rocketmq")
- .withSampler(samplerConfig)
- .withReporter(reporterConfig);
+ .withSampler(samplerConfig)
+ .withReporter(reporterConfig);
GlobalTracer.registerIfAbsent(config.getTracer());
return config.getTracer();
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
index 514f3ceb7..dc05c72b1 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
@@ -35,10 +35,21 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
public class OpenTracingTransactionProducer {
+
+ public static final String PRODUCER_GROUP = "please_rename_unique_group_name";
+ public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+ public static final String TOPIC = "TopicTest";
+ public static final String TAG = "Tag";
+ public static final String KEY = "KEY";
+ public static final int MESSAGE_COUNT = 100000;
+
public static void main(String[] args) throws MQClientException, InterruptedException {
Tracer tracer = initTracer();
- TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
+ TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
+
+ // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer));
producer.getDefaultMQProducerImpl().registerEndTransactionHook(new EndTransactionOpenTracingHookImpl(tracer));
@@ -56,15 +67,15 @@ public class OpenTracingTransactionProducer {
producer.start();
try {
- Message msg = new Message("TopicTest", "Tag", "KEY",
- "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ Message msg = new Message(TOPIC, TAG, KEY,
+ "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
- for (int i = 0; i < 100000; i++) {
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
Thread.sleep(1000);
}
producer.shutdown();
@@ -72,14 +83,14 @@ public class OpenTracingTransactionProducer {
private static Tracer initTracer() {
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
- .withType(ConstSampler.TYPE)
- .withParam(1);
+ .withType(ConstSampler.TYPE)
+ .withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
- .withLogSpans(true);
+ .withLogSpans(true);
Configuration config = new Configuration("rocketmq")
- .withSampler(samplerConfig)
- .withReporter(reporterConfig);
+ .withSampler(samplerConfig)
+ .withReporter(reporterConfig);
GlobalTracer.registerIfAbsent(config.getTracer());
return config.getTracer();
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
index fb8e37fd2..add6c4323 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
@@ -24,17 +24,28 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TraceProducer {
+
+ public static final String PRODUCER_GROUP = "ProducerGroupName";
+ public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+ public static final String TOPIC = "TopicTest";
+ public static final String TAG = "TagA";
+ public static final String KEY = "OrderID188";
+ public static final int MESSAGE_COUNT = 128;
+
public static void main(String[] args) throws MQClientException, InterruptedException {
- DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
+ DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true);
+
+ // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.start();
- for (int i = 0; i < 128; i++)
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
try {
{
- Message msg = new Message("TopicTest",
- "TagA",
- "OrderID188",
+ Message msg = new Message(TOPIC,
+ TAG,
+ KEY,
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
@@ -43,6 +54,7 @@ public class TraceProducer {
} catch (Exception e) {
e.printStackTrace();
}
+ }
producer.shutdown();
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
index 473351963..a833ee1e4 100644
--- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
@@ -17,30 +17,31 @@
package org.apache.rocketmq.example.tracemessage;
-import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.MessageExt;
public class TracePushConsumer {
+
+ public static final String CONSUMER_GROUP = "ProducerGroupName";
+ public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
+ public static final String TOPIC = "TopicTest";
+
public static void main(String[] args) throws InterruptedException, MQClientException {
// Here,we use the default message track trace topic name
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
- consumer.subscribe("TopicTest", "*");
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, true);
+
+ // Uncomment the following line while debugging, namesrvAddr should be set to your local address
+// consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
+ consumer.subscribe(TOPIC, "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// Wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");