You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2019/05/17 08:27:34 UTC

[rocketmq] branch develop updated (ec4e1c2 -> 0c84c73)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


    from ec4e1c2  [maven-release-plugin] prepare for next development iteration
     new f39c8f5  (1) Polish message trace target channel (2) Fix the issue that consume message with namespace trace cannot found
     new 0c84c73  Add comment for AccessChannel class

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/rocketmq/client/AccessChannel.java  | 20 +++++++---------
 .../org/apache/rocketmq/client/ClientConfig.java   | 10 ++++++++
 .../client/consumer/DefaultMQPushConsumer.java     |  2 +-
 .../client/producer/DefaultMQProducer.java         | 13 +++++++----
 .../client/trace/AsyncTraceDispatcher.java         | 27 +++++++++++++++-------
 .../rocketmq/client/trace/TraceDispatcher.java     |  3 ++-
 .../trace/hook/ConsumeMessageTraceHookImpl.java    |  7 +++---
 .../trace/hook/SendMessageTraceHookImpl.java       |  9 ++++----
 8 files changed, 57 insertions(+), 34 deletions(-)
 copy filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java => client/src/main/java/org/apache/rocketmq/client/AccessChannel.java (69%)


[rocketmq] 02/02: Add comment for AccessChannel class

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 0c84c73e130d740be0d65a0cc4c08ace087483a1
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri May 17 10:29:18 2019 +0800

    Add comment for AccessChannel class
---
 .../java/org/apache/rocketmq/client/AccessChannel.java   | 16 ++++++++++++++--
 .../java/org/apache/rocketmq/client/ClientConfig.java    |  2 +-
 .../rocketmq/client/trace/AsyncTraceDispatcher.java      |  4 ++--
 3 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java b/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java
index 82978b0..d6feb57 100644
--- a/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java
+++ b/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java
@@ -15,7 +15,19 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.client;
+
+/**
+ * Used for set access channel, if need migrate the rocketmq service to cloud, it is We recommend set the value with
+ * "CLOUD". otherwise set with "LOCAL", especially used the message trace feature.
+ */
 public enum AccessChannel {
-    local,
-    cloud,
+    /**
+     * Means connect to private IDC cluster.
+     */
+    LOCAL,
+
+    /**
+     * Means connect to Cloud service.
+     */
+    CLOUD,
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 53ac353..87c01a5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -37,7 +37,7 @@ public class ClientConfig {
     private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
     private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
     protected String namespace;
-    protected AccessChannel accessChannel = AccessChannel.local;
+    protected AccessChannel accessChannel = AccessChannel.LOCAL;
 
     /**
      * Pulling topic information interval from the named server
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 3b5fc1d..ca3bcfa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -72,7 +72,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private String dispatcherId = UUID.randomUUID().toString();
     private String traceTopicName;
     private AtomicBoolean isStarted = new AtomicBoolean(false);
-    private AccessChannel accessChannel = AccessChannel.local;
+    private AccessChannel accessChannel = AccessChannel.LOCAL;
 
     public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
         // queueSize is greater than or equal to the n power of 2 of value
@@ -341,7 +341,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
          */
         private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
             String traceTopic = traceTopicName;
-            if (AccessChannel.cloud == accessChannel){
+            if (AccessChannel.CLOUD == accessChannel) {
                 traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
             }
             final Message message = new Message(traceTopic, data.getBytes());


[rocketmq] 01/02: (1) Polish message trace target channel (2) Fix the issue that consume message with namespace trace cannot found

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit f39c8f5e47baa82f94427d60de4b70265d99c0c1
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri May 17 01:38:28 2019 +0800

    (1) Polish message trace target channel (2) Fix the issue that consume message with namespace trace cannot found
---
 .../TraceDispatcher.java => AccessChannel.java}    | 37 +++-------------------
 .../org/apache/rocketmq/client/ClientConfig.java   | 10 ++++++
 .../client/consumer/DefaultMQPushConsumer.java     |  2 +-
 .../client/producer/DefaultMQProducer.java         | 13 +++++---
 .../client/trace/AsyncTraceDispatcher.java         | 27 +++++++++++-----
 .../rocketmq/client/trace/TraceDispatcher.java     |  3 +-
 .../trace/hook/ConsumeMessageTraceHookImpl.java    |  7 ++--
 .../trace/hook/SendMessageTraceHookImpl.java       |  9 +++---
 8 files changed, 53 insertions(+), 55 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java
similarity index 54%
copy from client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
copy to client/src/main/java/org/apache/rocketmq/client/AccessChannel.java
index 275e6a3..82978b0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java
@@ -14,37 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.client.exception.MQClientException;
-import java.io.IOException;
-
-/**
- * Interface of asynchronous transfer data
- */
-public interface TraceDispatcher {
-
-    /**
-     * Initialize asynchronous transfer data module
-     */
-    void start(String nameSrvAddr) throws MQClientException;
-
-    /**
-     * Append the transfering data
-     * @param ctx data infomation
-     * @return
-     */
-    boolean append(Object ctx);
-
-    /**
-     * Write flush action
-     *
-     * @throws IOException
-     */
-    void flush() throws IOException;
-
-    /**
-     * Close the trace Hook
-     */
-    void shutdown();
+package org.apache.rocketmq.client;
+public enum AccessChannel {
+    local,
+    cloud,
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 6493f2d..53ac353 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -37,6 +37,8 @@ public class ClientConfig {
     private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
     private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
     protected String namespace;
+    protected AccessChannel accessChannel = AccessChannel.local;
+
     /**
      * Pulling topic information interval from the named server
      */
@@ -263,6 +265,14 @@ public class ClientConfig {
         this.namespace = namespace;
     }
 
+    public AccessChannel getAccessChannel() {
+        return this.accessChannel;
+    }
+
+    public void setAccessChannel(AccessChannel accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
     @Override
     public String toString() {
         return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 44edfb6..339f799 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -693,7 +693,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
         this.defaultMQPushConsumerImpl.start();
         if (null != traceDispatcher) {
             try {
-                traceDispatcher.start(this.getNamesrvAddr());
+                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
             } catch (MQClientException e) {
                 log.warn("trace dispatcher start failed ", e);
             }
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index d5fbde0..b4acf8f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -238,15 +238,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     }
 
     /**
-     * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic name.
+     * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic
+     * name.
      *
      * @param namespace Namespace for this MQ Producer instance.
      * @param producerGroup Producer group, see the name-sake field.
      * @param rpcHook RPC hook to execute per each remoting command execution.
      * @param enableMsgTrace Switch flag instance for message trace.
-     * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
+     * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+     * trace topic name.
      */
-    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {
+    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
+        boolean enableMsgTrace, final String customizedTraceTopic) {
         this.namespace = namespace;
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
@@ -282,7 +285,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
         this.defaultMQProducerImpl.start();
         if (null != traceDispatcher) {
             try {
-                traceDispatcher.start(this.getNamesrvAddr());
+                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
             } catch (MQClientException e) {
                 log.warn("trace dispatcher start failed ", e);
             }
@@ -331,7 +334,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     @Override
     public SendResult send(
         Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        Validators.checkMessage(msg,this);
+        Validators.checkMessage(msg, this);
         msg.setTopic(withNamespace(msg.getTopic()));
         return this.defaultMQProducerImpl.send(msg);
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 0aaadb1..3b5fc1d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
@@ -57,7 +58,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private final int batchSize;
     private final int maxMsgSize;
     private final DefaultMQProducer traceProducer;
-    private final ThreadPoolExecutor traceExecuter;
+    private final ThreadPoolExecutor traceExecutor;
     // The last discard number of log
     private AtomicLong discardCount;
     private Thread worker;
@@ -71,8 +72,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private String dispatcherId = UUID.randomUUID().toString();
     private String traceTopicName;
     private AtomicBoolean isStarted = new AtomicBoolean(false);
+    private AccessChannel accessChannel = AccessChannel.local;
 
-    public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {
+    public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
         // queueSize is greater than or equal to the n power of 2 of value
         this.queueSize = 2048;
         this.batchSize = 100;
@@ -85,7 +87,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         } else {
             this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
         }
-        this.traceExecuter = new ThreadPoolExecutor(//
+        this.traceExecutor = new ThreadPoolExecutor(//
             10, //
             20, //
             1000 * 60, //
@@ -95,6 +97,14 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         traceProducer = getAndCreateTraceProducer(rpcHook);
     }
 
+    public AccessChannel getAccessChannel() {
+        return accessChannel;
+    }
+
+    public void setAccessChannel(AccessChannel accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+
     public String getTraceTopicName() {
         return traceTopicName;
     }
@@ -123,12 +133,13 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         this.hostConsumer = hostConsumer;
     }
 
-    public void start(String nameSrvAddr) throws MQClientException {
+    public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
         if (isStarted.compareAndSet(false, true)) {
             traceProducer.setNamesrvAddr(nameSrvAddr);
             traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
             traceProducer.start();
         }
+        this.accessChannel = accessChannel;
         this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
         this.worker.setDaemon(true);
         this.worker.start();
@@ -174,7 +185,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     @Override
     public void shutdown() {
         this.stopped = true;
-        this.traceExecuter.shutdown();
+        this.traceExecutor.shutdown();
         if (isStarted.get()) {
             traceProducer.shutdown();
         }
@@ -231,7 +242,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
                 }
                 if (contexts.size() > 0) {
                     AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
-                    traceExecuter.submit(request);
+                    traceExecutor.submit(request);
                 } else if (AsyncTraceDispatcher.this.stopped) {
                     this.stopped = true;
                 }
@@ -330,11 +341,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
          */
         private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
             String traceTopic = traceTopicName;
-            if (StringUtils.isNotEmpty(regionId) && MixAll.RMQ_SYS_TRACE_TOPIC.equals(traceTopic)) {
+            if (AccessChannel.cloud == accessChannel){
                 traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
             }
             final Message message = new Message(traceTopic, data.getBytes());
-
             // Keyset of message trace includes msgId of or original message
             message.setKeys(keySet);
             try {
@@ -342,6 +352,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
                 SendCallback callback = new SendCallback() {
                     @Override
                     public void onSuccess(SendResult sendResult) {
+
                     }
 
                     @Override
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
index 275e6a3..51cc0de 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.trace;
 
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.client.exception.MQClientException;
 import java.io.IOException;
 
@@ -27,7 +28,7 @@ public interface TraceDispatcher {
     /**
      * Initialize asynchronous transfer data module
      */
-    void start(String nameSrvAddr) throws MQClientException;
+    void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
 
     /**
      * Append the transfering data
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
index 38ec8b9..f30b121 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
 
 public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
 
@@ -51,7 +52,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
         TraceContext traceContext = new TraceContext();
         context.setMqTraceContext(traceContext);
         traceContext.setTraceType(TraceType.SubBefore);//
-        traceContext.setGroupName(context.getConsumerGroup());//
+        traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//
         List<TraceBean> beans = new ArrayList<TraceBean>();
         for (MessageExt msg : context.getMsgList()) {
             if (msg == null) {
@@ -65,7 +66,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
                 continue;
             }
             TraceBean traceBean = new TraceBean();
-            traceBean.setTopic(msg.getTopic());//
+            traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//
             traceBean.setMsgId(msg.getMsgId());//
             traceBean.setTags(msg.getTags());//
             traceBean.setKeys(msg.getKeys());//
@@ -96,7 +97,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
         TraceContext subAfterContext = new TraceContext();
         subAfterContext.setTraceType(TraceType.SubAfter);//
         subAfterContext.setRegionId(subBeforeContext.getRegionId());//
-        subAfterContext.setGroupName(subBeforeContext.getGroupName());//
+        subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
         subAfterContext.setRequestId(subBeforeContext.getRequestId());//
         subAfterContext.setSuccess(context.isSuccess());//
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
index 20396c6..80c7bab 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
@@ -16,15 +16,16 @@
  */
 package org.apache.rocketmq.client.trace.hook;
 
+import java.util.ArrayList;
 import org.apache.rocketmq.client.hook.SendMessageContext;
 import org.apache.rocketmq.client.hook.SendMessageHook;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
+import org.apache.rocketmq.client.trace.TraceBean;
 import org.apache.rocketmq.client.trace.TraceContext;
 import org.apache.rocketmq.client.trace.TraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceBean;
 import org.apache.rocketmq.client.trace.TraceType;
-import java.util.ArrayList;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
 
 public class SendMessageTraceHookImpl implements SendMessageHook {
 
@@ -50,10 +51,10 @@ public class SendMessageTraceHookImpl implements SendMessageHook {
         tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
         context.setMqTraceContext(tuxeContext);
         tuxeContext.setTraceType(TraceType.Pub);
-        tuxeContext.setGroupName(context.getProducerGroup());
+        tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
         //build the data bean object of message trace
         TraceBean traceBean = new TraceBean();
-        traceBean.setTopic(context.getMessage().getTopic());
+        traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
         traceBean.setTags(context.getMessage().getTags());
         traceBean.setKeys(context.getMessage().getKeys());
         traceBean.setStoreHost(context.getBrokerAddr());