You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2021/09/11 04:39:34 UTC

[rocketmq] branch develop updated: [#3326] fix send trace fail if useTLS=true (#3325)

This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4b7a90a  [#3326] fix send trace fail if useTLS=true (#3325)
4b7a90a is described below

commit 4b7a90a32ba4153d7cd6996b260c59895e2a7731
Author: yuz10 <84...@qq.com>
AuthorDate: Sat Sep 11 12:39:19 2021 +0800

    [#3326] fix send trace fail if useTLS=true (#3325)
---
 .../rocketmq/client/consumer/DefaultLitePullConsumer.java    |  4 +++-
 .../rocketmq/client/consumer/DefaultMQPushConsumer.java      | 10 +++++++++-
 .../apache/rocketmq/client/producer/DefaultMQProducer.java   |  8 ++++++++
 .../client/trace/DefaultMQConsumerWithTraceTest.java         |  9 +++++++++
 .../client/trace/DefaultMQLitePullConsumerWithTraceTest.java | 12 +++++++++++-
 .../client/trace/DefaultMQProducerWithTraceTest.java         | 10 ++++++++++
 6 files changed, 50 insertions(+), 3 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index c54399a..74d6f34 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -535,7 +535,9 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
     private void setTraceDispatcher() {
         if (isEnableMsgTrace()) {
             try {
-                this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null);
+                AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null);
+                traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS());
+                this.traceDispatcher = traceDispatcher;
                 this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
                     new ConsumeMessageTraceHookImpl(traceDispatcher));
             } catch (Throwable e) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index caf166d..58cf134 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -412,7 +412,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
     public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
         createTopic(key, withNamespace(newTopic), queueNum, 0);
     }
-
+    
+    @Override
+    public void setUseTLS(boolean useTLS) {
+        super.setUseTLS(useTLS);
+        if (traceDispatcher != null && traceDispatcher instanceof AsyncTraceDispatcher) {
+            ((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS);
+        }
+    }
+    
     /**
      * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
      */
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 1c4a931..0705935 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -263,6 +263,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
         }
     }
 
+    @Override
+    public void setUseTLS(boolean useTLS) {
+        super.setUseTLS(useTLS);
+        if (traceDispatcher != null && traceDispatcher instanceof AsyncTraceDispatcher) {
+            ((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS);
+        }
+    }
+    
     /**
      * Start this producer instance. </p>
      *
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index aec7d2c..976380b 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -69,6 +69,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -235,6 +236,14 @@ public class DefaultMQConsumerWithTraceTest {
         assertThat(msg.getTopic()).isEqualTo(topic);
         assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
     }
+    
+    @Test
+    public void testPushConsumerWithTraceTLS() {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup", true);
+        consumer.setUseTLS(true);
+        AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher();
+        Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
+    }
 
     private PullRequest createPullRequest() {
         PullRequest pullRequest = new PullRequest();
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java
index 67ae194..ce3b832 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java
@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -146,6 +147,15 @@ public class DefaultMQLitePullConsumerWithTraceTest {
         }
     }
 
+    @Test
+    public void testLitePullConsumerWithTraceTLS() throws Exception {
+        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("consumerGroup");
+        consumer.setUseTLS(true);
+        consumer.setEnableMsgTrace(true);
+        consumer.start();
+        AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher();
+        Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
+    }
 
     private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception {
         DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
@@ -302,4 +312,4 @@ public class DefaultMQLitePullConsumerWithTraceTest {
         doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
     }
 
-}
\ No newline at end of file
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index 62b3417..234e32e 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -143,6 +144,15 @@ public class DefaultMQProducerWithTraceTest {
 
     }
 
+    
+    @Test
+    public void testProducerWithTraceTLS() {
+        DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp, true);
+        producer.setUseTLS(true);
+        AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
+        Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
+    }
+    
     @After
     public void terminate() {
         producer.shutdown();