You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2019/05/17 08:27:35 UTC
[rocketmq] 01/02: (1) Polish message trace target channel (2) Fix
the issue that consume message with namespace trace cannot found
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit f39c8f5e47baa82f94427d60de4b70265d99c0c1
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri May 17 01:38:28 2019 +0800
(1) Polish message trace target channel (2) Fix the issue that consume message with namespace trace cannot found
---
.../TraceDispatcher.java => AccessChannel.java} | 37 +++-------------------
.../org/apache/rocketmq/client/ClientConfig.java | 10 ++++++
.../client/consumer/DefaultMQPushConsumer.java | 2 +-
.../client/producer/DefaultMQProducer.java | 13 +++++---
.../client/trace/AsyncTraceDispatcher.java | 27 +++++++++++-----
.../rocketmq/client/trace/TraceDispatcher.java | 3 +-
.../trace/hook/ConsumeMessageTraceHookImpl.java | 7 ++--
.../trace/hook/SendMessageTraceHookImpl.java | 9 +++---
8 files changed, 53 insertions(+), 55 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java
similarity index 54%
copy from client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
copy to client/src/main/java/org/apache/rocketmq/client/AccessChannel.java
index 275e6a3..82978b0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/AccessChannel.java
@@ -14,37 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.client.exception.MQClientException;
-import java.io.IOException;
-
-/**
- * Interface of asynchronous transfer data
- */
-public interface TraceDispatcher {
-
- /**
- * Initialize asynchronous transfer data module
- */
- void start(String nameSrvAddr) throws MQClientException;
-
- /**
- * Append the transfering data
- * @param ctx data infomation
- * @return
- */
- boolean append(Object ctx);
-
- /**
- * Write flush action
- *
- * @throws IOException
- */
- void flush() throws IOException;
-
- /**
- * Close the trace Hook
- */
- void shutdown();
+package org.apache.rocketmq.client;
+public enum AccessChannel {
+ local,
+ cloud,
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 6493f2d..53ac353 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -37,6 +37,8 @@ public class ClientConfig {
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
protected String namespace;
+ protected AccessChannel accessChannel = AccessChannel.local;
+
/**
* Pulling topic information interval from the named server
*/
@@ -263,6 +265,14 @@ public class ClientConfig {
this.namespace = namespace;
}
+ public AccessChannel getAccessChannel() {
+ return this.accessChannel;
+ }
+
+ public void setAccessChannel(AccessChannel accessChannel) {
+ this.accessChannel = accessChannel;
+ }
+
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 44edfb6..339f799 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -693,7 +693,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
- traceDispatcher.start(this.getNamesrvAddr());
+ traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index d5fbde0..b4acf8f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -238,15 +238,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
- * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic name.
+ * Constructor specifying namespace, producer group, RPC hook, enabled msgTrace flag and customized trace topic
+ * name.
*
* @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param enableMsgTrace Switch flag instance for message trace.
- * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+ * trace topic name.
*/
- public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {
+ public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
+ boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
@@ -282,7 +285,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
- traceDispatcher.start(this.getNamesrvAddr());
+ traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
@@ -331,7 +334,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- Validators.checkMessage(msg,this);
+ Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 0aaadb1..3b5fc1d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
@@ -57,7 +58,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private final int batchSize;
private final int maxMsgSize;
private final DefaultMQProducer traceProducer;
- private final ThreadPoolExecutor traceExecuter;
+ private final ThreadPoolExecutor traceExecutor;
// The last discard number of log
private AtomicLong discardCount;
private Thread worker;
@@ -71,8 +72,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
+ private AccessChannel accessChannel = AccessChannel.local;
- public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {
+ public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
@@ -85,7 +87,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
} else {
this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
}
- this.traceExecuter = new ThreadPoolExecutor(//
+ this.traceExecutor = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
@@ -95,6 +97,14 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
traceProducer = getAndCreateTraceProducer(rpcHook);
}
+ public AccessChannel getAccessChannel() {
+ return accessChannel;
+ }
+
+ public void setAccessChannel(AccessChannel accessChannel) {
+ this.accessChannel = accessChannel;
+ }
+
public String getTraceTopicName() {
return traceTopicName;
}
@@ -123,12 +133,13 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.hostConsumer = hostConsumer;
}
- public void start(String nameSrvAddr) throws MQClientException {
+ public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
+ this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
@@ -174,7 +185,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
@Override
public void shutdown() {
this.stopped = true;
- this.traceExecuter.shutdown();
+ this.traceExecutor.shutdown();
if (isStarted.get()) {
traceProducer.shutdown();
}
@@ -231,7 +242,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
- traceExecuter.submit(request);
+ traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
@@ -330,11 +341,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
String traceTopic = traceTopicName;
- if (StringUtils.isNotEmpty(regionId) && MixAll.RMQ_SYS_TRACE_TOPIC.equals(traceTopic)) {
+ if (AccessChannel.cloud == accessChannel){
traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
}
final Message message = new Message(traceTopic, data.getBytes());
-
// Keyset of message trace includes msgId of or original message
message.setKeys(keySet);
try {
@@ -342,6 +352,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
+
}
@Override
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
index 275e6a3..51cc0de 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import java.io.IOException;
@@ -27,7 +28,7 @@ public interface TraceDispatcher {
/**
* Initialize asynchronous transfer data module
*/
- void start(String nameSrvAddr) throws MQClientException;
+ void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
/**
* Append the transfering data
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
index 38ec8b9..f30b121 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import java.util.ArrayList;
import java.util.List;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
@@ -51,7 +52,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
TraceContext traceContext = new TraceContext();
context.setMqTraceContext(traceContext);
traceContext.setTraceType(TraceType.SubBefore);//
- traceContext.setGroupName(context.getConsumerGroup());//
+ traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//
List<TraceBean> beans = new ArrayList<TraceBean>();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
@@ -65,7 +66,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
continue;
}
TraceBean traceBean = new TraceBean();
- traceBean.setTopic(msg.getTopic());//
+ traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//
traceBean.setMsgId(msg.getMsgId());//
traceBean.setTags(msg.getTags());//
traceBean.setKeys(msg.getKeys());//
@@ -96,7 +97,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TraceType.SubAfter);//
subAfterContext.setRegionId(subBeforeContext.getRegionId());//
- subAfterContext.setGroupName(subBeforeContext.getGroupName());//
+ subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());//
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
index 20396c6..80c7bab 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
@@ -16,15 +16,16 @@
*/
package org.apache.rocketmq.client.trace.hook;
+import java.util.ArrayList;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
+import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceType;
-import java.util.ArrayList;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
public class SendMessageTraceHookImpl implements SendMessageHook {
@@ -50,10 +51,10 @@ public class SendMessageTraceHookImpl implements SendMessageHook {
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
- tuxeContext.setGroupName(context.getProducerGroup());
+ tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
- traceBean.setTopic(context.getMessage().getTopic());
+ traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());