You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2021/09/11 04:39:34 UTC
[rocketmq] branch develop updated: [#3326] fix send trace fail if
useTLS=true (#3325)
This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4b7a90a [#3326] fix send trace fail if useTLS=true (#3325)
4b7a90a is described below
commit 4b7a90a32ba4153d7cd6996b260c59895e2a7731
Author: yuz10 <84...@qq.com>
AuthorDate: Sat Sep 11 12:39:19 2021 +0800
[#3326] fix send trace fail if useTLS=true (#3325)
---
.../rocketmq/client/consumer/DefaultLitePullConsumer.java | 4 +++-
.../rocketmq/client/consumer/DefaultMQPushConsumer.java | 10 +++++++++-
.../apache/rocketmq/client/producer/DefaultMQProducer.java | 8 ++++++++
.../client/trace/DefaultMQConsumerWithTraceTest.java | 9 +++++++++
.../client/trace/DefaultMQLitePullConsumerWithTraceTest.java | 12 +++++++++++-
.../client/trace/DefaultMQProducerWithTraceTest.java | 10 ++++++++++
6 files changed, 50 insertions(+), 3 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index c54399a..74d6f34 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -535,7 +535,9 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
private void setTraceDispatcher() {
if (isEnableMsgTrace()) {
try {
- this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null);
+ AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null);
+ traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS());
+ this.traceDispatcher = traceDispatcher;
this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index caf166d..58cf134 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -412,7 +412,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0);
}
-
+
+ @Override
+ public void setUseTLS(boolean useTLS) {
+ super.setUseTLS(useTLS);
+ if (traceDispatcher != null && traceDispatcher instanceof AsyncTraceDispatcher) {
+ ((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS);
+ }
+ }
+
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 1c4a931..0705935 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -263,6 +263,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
}
+ @Override
+ public void setUseTLS(boolean useTLS) {
+ super.setUseTLS(useTLS);
+ if (traceDispatcher != null && traceDispatcher instanceof AsyncTraceDispatcher) {
+ ((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS);
+ }
+ }
+
/**
* Start this producer instance. </p>
*
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index aec7d2c..976380b 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -69,6 +69,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -235,6 +236,14 @@ public class DefaultMQConsumerWithTraceTest {
assertThat(msg.getTopic()).isEqualTo(topic);
assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
}
+
+ @Test
+ public void testPushConsumerWithTraceTLS() {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup", true);
+ consumer.setUseTLS(true);
+ AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher();
+ Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
+ }
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java
index 67ae194..ce3b832 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java
@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -146,6 +147,15 @@ public class DefaultMQLitePullConsumerWithTraceTest {
}
}
+ @Test
+ public void testLitePullConsumerWithTraceTLS() throws Exception {
+ DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("consumerGroup");
+ consumer.setUseTLS(true);
+ consumer.setEnableMsgTrace(true);
+ consumer.start();
+ AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher();
+ Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
+ }
private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
@@ -302,4 +312,4 @@ public class DefaultMQLitePullConsumerWithTraceTest {
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
}
-}
\ No newline at end of file
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index 62b3417..234e32e 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -143,6 +144,15 @@ public class DefaultMQProducerWithTraceTest {
}
+
+ @Test
+ public void testProducerWithTraceTLS() {
+ DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp, true);
+ producer.setUseTLS(true);
+ AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
+ Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
+ }
+
@After
public void terminate() {
producer.shutdown();