You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2019/01/17 07:24:36 UTC

[rocketmq] branch develop updated: fix bug: when producers send msg to multi clusters, only one cluster can receive message trace message (#694)

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new eeb1407  fix bug: when producers send msg to multi clusters, only one cluster can receive message trace message (#694)
eeb1407 is described below

commit eeb1407032ec383ca8d2610af8dd1f3d5dca4994
Author: superheizai <su...@aliyun.com>
AuthorDate: Thu Jan 17 15:24:31 2019 +0800

    fix bug: when producers send msg to multi clusters, only one cluster can receive message trace message (#694)
    
    * make isStarted as not static; rename  instanceName with nameSrv address as subfix to fix connect multi cluster
    
    * fix checkstyle error
    
    * remove useless mock in DefaultMQProducerWithTraceTest
---
 .../client/trace/AsyncTraceDispatcher.java         | 41 ++++++++--------
 .../trace/DefaultMQProducerWithTraceTest.java      | 55 +++++++++-------------
 2 files changed, 42 insertions(+), 54 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 5542324..87a795e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.client.trace;
 
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
@@ -33,20 +32,22 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.RPCHook;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.HashMap;
-import java.util.UUID;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.ArrayList;
-import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.remoting.RPCHook;
+import java.util.UUID;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+
 
 import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
 
@@ -70,7 +71,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
     private String dispatcherId = UUID.randomUUID().toString();
     private String traceTopicName;
-    private static AtomicBoolean isStarted = new AtomicBoolean(false);
+    private AtomicBoolean isStarted = new AtomicBoolean(false);
 
 
     public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {
@@ -87,12 +88,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
             this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
         }
         this.traceExecuter = new ThreadPoolExecutor(//
-            10, //
-            20, //
-            1000 * 60, //
-            TimeUnit.MILLISECONDS, //
-            this.appenderQueue, //
-            new ThreadFactoryImpl("MQTraceSendThread_"));
+                10, //
+                20, //
+                1000 * 60, //
+                TimeUnit.MILLISECONDS, //
+                this.appenderQueue, //
+                new ThreadFactoryImpl("MQTraceSendThread_"));
         traceProducer = getAndCreateTraceProducer(rpcHook);
     }
 
@@ -103,11 +104,11 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     public void setTraceTopicName(String traceTopicName) {
         this.traceTopicName = traceTopicName;
     }
-    
+
     public DefaultMQProducer getTraceProducer() {
         return traceProducer;
     }
-    
+
     public DefaultMQProducerImpl getHostProducer() {
         return hostProducer;
     }
@@ -127,6 +128,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     public void start(String nameSrvAddr) throws MQClientException {
         if (isStarted.compareAndSet(false, true)) {
             traceProducer.setNamesrvAddr(nameSrvAddr);
+            traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
             traceProducer.start();
         }
         this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
@@ -141,7 +143,6 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
             traceProducerInstance = new DefaultMQProducer(rpcHook);
             traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
             traceProducerInstance.setSendMsgTimeout(5000);
-            traceProducerInstance.setInstanceName(TRACE_INSTANCE_NAME);
             traceProducerInstance.setVipChannelEnabled(false);
             // The max size of message is 128K
             traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
@@ -256,7 +257,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         public void run() {
             sendTraceData(contextList);
         }
-        
+
         public void sendTraceData(List<TraceContext> contextList) {
             Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
             for (TraceContext context : contextList) {
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index 6dcceeb..903be01 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -17,12 +17,6 @@
 
 package org.apache.rocketmq.client.trace;
 
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -52,11 +46,14 @@ import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.nullable;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -67,11 +64,6 @@ public class DefaultMQProducerWithTraceTest {
     @Mock
     private MQClientAPIImpl mQClientAPIImpl;
 
-    @Spy
-    private MQClientInstance mQClientTraceFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
-    @Mock
-    private MQClientAPIImpl mQClientTraceAPIImpl;
-
     private AsyncTraceDispatcher asyncTraceDispatcher;
 
     private DefaultMQProducer producer;
@@ -89,56 +81,52 @@ public class DefaultMQProducerWithTraceTest {
     @Before
     public void init() throws Exception {
 
-        customTraceTopicproducer = new DefaultMQProducer(producerGroupTemp,false, customerTraceTopic);
-        normalProducer = new DefaultMQProducer(producerGroupTemp,false,"");
-        producer = new DefaultMQProducer(producerGroupTemp,true,"");
+        customTraceTopicproducer = new DefaultMQProducer(producerGroupTemp, false, customerTraceTopic);
+        normalProducer = new DefaultMQProducer(producerGroupTemp, false, "");
+        producer = new DefaultMQProducer(producerGroupTemp, true, "");
         producer.setNamesrvAddr("127.0.0.1:9876");
         normalProducer.setNamesrvAddr("127.0.0.1:9877");
         customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
-        message = new Message(topic, new byte[] {'a', 'b' ,'c'});
-        asyncTraceDispatcher = (AsyncTraceDispatcher)producer.getTraceDispatcher();
+        message = new Message(topic, new byte[]{'a', 'b', 'c'});
+        asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
         asyncTraceDispatcher.setTraceTopicName(customerTraceTopic);
         asyncTraceDispatcher.getHostProducer();
         asyncTraceDispatcher.getHostConsumer();
         traceProducer = asyncTraceDispatcher.getTraceProducer();
 
         producer.start();
-        
+
         Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
         field.setAccessible(true);
         field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
 
         Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
         fieldTrace.setAccessible(true);
-        fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientTraceFactory);
+        fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory);
 
         field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
         field.setAccessible(true);
         field.set(mQClientFactory, mQClientAPIImpl);
 
-        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
-        field.setAccessible(true);
-        field.set(mQClientTraceFactory, mQClientTraceAPIImpl);
 
         producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
 
         when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
-            nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
+                nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
         when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
-            nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
-            .thenReturn(createSendResult(SendStatus.SEND_OK));
-        
+                nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
+                .thenReturn(createSendResult(SendStatus.SEND_OK));
+
     }
 
     @Test
     public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
         traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
         when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
-        when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute());
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         try {
             producer.send(message);
-        }catch (MQClientException e){
+        } catch (MQClientException e) {
         }
         countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
 
@@ -147,11 +135,10 @@ public class DefaultMQProducerWithTraceTest {
     @Test
     public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
         when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
-        when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute());
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         try {
             producer.send(message);
-        }catch (MQClientException e){
+        } catch (MQClientException e) {
         }
         countDownLatch.await(3000L, TimeUnit.MILLISECONDS);