You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2019/05/17 08:27:35 UTC

[rocketmq] 01/02: (1) Polish message trace target channel (2) Fix the issue that consume message with namespace trace cannot found

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit f39c8f5e47baa82f94427d60de4b70265d99c0c1
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri May 17 01:38:28 2019 +0800

    (1) Polish message trace target channel (2) Fix the issue that consume message with namespace trace cannot found
---
 .../TraceDispatcher.java => AccessChannel.java}    | 37 +++-------------------
 .../org/apache/rocketmq/client/ClientConfig.java   | 10 ++++++
 .../client/consumer/DefaultMQPushConsumer.java     |  2 +-
 .../client/producer/DefaultMQProducer.java         | 13 +++++---
 .../client/trace/AsyncTraceDispatcher.java         | 27 +++++++++++-----
 .../rocketmq/client/trace/TraceDispatcher.java     |  3 +-
 .../trace/hook/ConsumeMessageTraceHookImpl.java    |  7 ++--
 .../trace/hook/SendMessageTraceHookImpl.java       |  9 +++---
 8 files changed, 53 insertions(+), 55 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java
similarity index 54%
copy from client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
copy to client/src/main/java/org/apache/rocketmq/client/AccessChannel.java
index 275e6a3..82978b0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java
@@ -14,37 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.client.exception.MQClientException;
-import java.io.IOException;
-
-/**
- * Interface of asynchronous transfer data
- */
-public interface TraceDispatcher {
-
-    /**
-     * Initialize asynchronous transfer data module
-     */
-    void start(String nameSrvAddr) throws MQClientException;
-
-    /**
-     * Append the transfering data
-     * @param ctx data infomation
-     * @return
-     */
-    boolean append(Object ctx);
-
-    /**
-     * Write flush action
-     *
-     * @throws IOException
-     */
-    void flush() throws IOException;
-
-    /**
-     * Close the trace Hook
-     */
-    void shutdown();
+package org.apache.rocketmq.client;
+public enum AccessChannel {
+    local,
+    cloud,
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 6493f2d..53ac353 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -37,6 +37,8 @@ public class ClientConfig {
     private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
     private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
     protected String namespace;
+    protected AccessChannel accessChannel = AccessChannel.local;
+
     /**
      * Pulling topic information interval from the named server
      */
@@ -263,6 +265,14 @@ public class ClientConfig {
         this.namespace = namespace;
     }
 
+    public AccessChannel getAccessChannel() {
+        return this.accessChannel;
+    }
+
+    public void setAccessChannel(AccessChannel accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
     @Override
     public String toString() {
         return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 44edfb6..339f799 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -693,7 +693,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
         this.defaultMQPushConsumerImpl.start();
         if (null != traceDispatcher) {
             try {
-                traceDispatcher.start(this.getNamesrvAddr());
+                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
             } catch (MQClientException e) {
                 log.warn("trace dispatcher start failed ", e);
             }
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index d5fbde0..b4acf8f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -238,15 +238,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     }
 
     /**
-     * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic name.
+     * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic
+     * name.
      *
      * @param namespace Namespace for this MQ Producer instance.
      * @param producerGroup Producer group, see the name-sake field.
      * @param rpcHook RPC hook to execute per each remoting command execution.
      * @param enableMsgTrace Switch flag instance for message trace.
-     * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
+     * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+     * trace topic name.
      */
-    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {
+    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
+        boolean enableMsgTrace, final String customizedTraceTopic) {
         this.namespace = namespace;
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
@@ -282,7 +285,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
         this.defaultMQProducerImpl.start();
         if (null != traceDispatcher) {
             try {
-                traceDispatcher.start(this.getNamesrvAddr());
+                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
             } catch (MQClientException e) {
                 log.warn("trace dispatcher start failed ", e);
             }
@@ -331,7 +334,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     @Override
     public SendResult send(
         Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        Validators.checkMessage(msg,this);
+        Validators.checkMessage(msg, this);
         msg.setTopic(withNamespace(msg.getTopic()));
         return this.defaultMQProducerImpl.send(msg);
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 0aaadb1..3b5fc1d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
@@ -57,7 +58,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private final int batchSize;
     private final int maxMsgSize;
     private final DefaultMQProducer traceProducer;
-    private final ThreadPoolExecutor traceExecuter;
+    private final ThreadPoolExecutor traceExecutor;
     // The last discard number of log
     private AtomicLong discardCount;
     private Thread worker;
@@ -71,8 +72,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private String dispatcherId = UUID.randomUUID().toString();
     private String traceTopicName;
     private AtomicBoolean isStarted = new AtomicBoolean(false);
+    private AccessChannel accessChannel = AccessChannel.local;
 
-    public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {
+    public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
         // queueSize is greater than or equal to the n power of 2 of value
         this.queueSize = 2048;
         this.batchSize = 100;
@@ -85,7 +87,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         } else {
             this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
         }
-        this.traceExecuter = new ThreadPoolExecutor(//
+        this.traceExecutor = new ThreadPoolExecutor(//
             10, //
             20, //
             1000 * 60, //
@@ -95,6 +97,14 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         traceProducer = getAndCreateTraceProducer(rpcHook);
     }
 
+    public AccessChannel getAccessChannel() {
+        return accessChannel;
+    }
+
+    public void setAccessChannel(AccessChannel accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
     public String getTraceTopicName() {
         return traceTopicName;
     }
@@ -123,12 +133,13 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         this.hostConsumer = hostConsumer;
     }
 
-    public void start(String nameSrvAddr) throws MQClientException {
+    public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
         if (isStarted.compareAndSet(false, true)) {
             traceProducer.setNamesrvAddr(nameSrvAddr);
             traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
             traceProducer.start();
         }
+        this.accessChannel = accessChannel;
         this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
         this.worker.setDaemon(true);
         this.worker.start();
@@ -174,7 +185,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     @Override
     public void shutdown() {
         this.stopped = true;
-        this.traceExecuter.shutdown();
+        this.traceExecutor.shutdown();
         if (isStarted.get()) {
             traceProducer.shutdown();
         }
@@ -231,7 +242,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
                 }
                 if (contexts.size() > 0) {
                     AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
-                    traceExecuter.submit(request);
+                    traceExecutor.submit(request);
                 } else if (AsyncTraceDispatcher.this.stopped) {
                     this.stopped = true;
                 }
@@ -330,11 +341,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
          */
         private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
             String traceTopic = traceTopicName;
-            if (StringUtils.isNotEmpty(regionId) && MixAll.RMQ_SYS_TRACE_TOPIC.equals(traceTopic)) {
+            if (AccessChannel.cloud == accessChannel){
                 traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
             }
             final Message message = new Message(traceTopic, data.getBytes());
-
             // Keyset of message trace includes msgId of or original message
             message.setKeys(keySet);
             try {
@@ -342,6 +352,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
                 SendCallback callback = new SendCallback() {
                     @Override
                     public void onSuccess(SendResult sendResult) {
+
                     }
 
                     @Override
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
index 275e6a3..51cc0de 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.trace;
 
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.exception.MQClientException;
 import java.io.IOException;
 
@@ -27,7 +28,7 @@ public interface TraceDispatcher {
     /**
      * Initialize asynchronous transfer data module
      */
-    void start(String nameSrvAddr) throws MQClientException;
+    void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
 
     /**
      * Append the transfering data
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
index 38ec8b9..f30b121 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
 
 public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
 
@@ -51,7 +52,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
         TraceContext traceContext = new TraceContext();
         context.setMqTraceContext(traceContext);
         traceContext.setTraceType(TraceType.SubBefore);//
-        traceContext.setGroupName(context.getConsumerGroup());//
+        traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//
         List<TraceBean> beans = new ArrayList<TraceBean>();
         for (MessageExt msg : context.getMsgList()) {
             if (msg == null) {
@@ -65,7 +66,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
                 continue;
             }
             TraceBean traceBean = new TraceBean();
-            traceBean.setTopic(msg.getTopic());//
+            traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//
             traceBean.setMsgId(msg.getMsgId());//
             traceBean.setTags(msg.getTags());//
             traceBean.setKeys(msg.getKeys());//
@@ -96,7 +97,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
         TraceContext subAfterContext = new TraceContext();
         subAfterContext.setTraceType(TraceType.SubAfter);//
         subAfterContext.setRegionId(subBeforeContext.getRegionId());//
-        subAfterContext.setGroupName(subBeforeContext.getGroupName());//
+        subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
         subAfterContext.setRequestId(subBeforeContext.getRequestId());//
         subAfterContext.setSuccess(context.isSuccess());//
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
index 20396c6..80c7bab 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
@@ -16,15 +16,16 @@
  */
 package org.apache.rocketmq.client.trace.hook;
 
+import java.util.ArrayList;
 import org.apache.rocketmq.client.hook.SendMessageContext;
 import org.apache.rocketmq.client.hook.SendMessageHook;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
+import org.apache.rocketmq.client.trace.TraceBean;
 import org.apache.rocketmq.client.trace.TraceContext;
 import org.apache.rocketmq.client.trace.TraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceBean;
 import org.apache.rocketmq.client.trace.TraceType;
-import java.util.ArrayList;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
 
 public class SendMessageTraceHookImpl implements SendMessageHook {
 
@@ -50,10 +51,10 @@ public class SendMessageTraceHookImpl implements SendMessageHook {
         tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
         context.setMqTraceContext(tuxeContext);
         tuxeContext.setTraceType(TraceType.Pub);
-        tuxeContext.setGroupName(context.getProducerGroup());
+        tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
         //build the data bean object of message trace
         TraceBean traceBean = new TraceBean();
-        traceBean.setTopic(context.getMessage().getTopic());
+        traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
         traceBean.setTags(context.getMessage().getTags());
         traceBean.setKeys(context.getMessage().getKeys());
         traceBean.setStoreHost(context.getBrokerAddr());