You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/11/02 23:04:26 UTC
[skywalking-java] branch main updated: Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span (#369)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git
The following commit(s) were added to refs/heads/main by this push:
new 6648e450b2 Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span (#369)
6648e450b2 is described below
commit 6648e450b20a66005321c730931fd249de85826b
Author: zhyyu <zh...@gmail.com>
AuthorDate: Thu Nov 3 07:04:20 2022 +0800
Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span (#369)
* add rocketmq consumer tag
* update CHANGES.md
* Update CHANGES.md
* change consumer tag not null to not blank
* set rocketmq consumer span's peer to namesrv
Co-authored-by: yuzhongyu <yu...@cestc.cn>
Co-authored-by: 吴晟 Wu Sheng <wu...@foxmail.com>
---
CHANGES.md | 1 +
.../v4/AbstractMessageConsumeInterceptor.java | 15 ++++-
.../v4/RegisterMessageListenerInterceptor.java | 51 ++++++++++++++++
.../rocketMQ/v4/define/ConsumerEnhanceInfos.java | 32 ++++++++++
.../DefaultMQPushConsumerInstrumentation.java | 69 ++++++++++++++++++++++
.../src/main/resources/skywalking-plugin.def | 1 +
.../rocketmq-scenario/config/expectedData.yaml | 4 +-
7 files changed, 171 insertions(+), 2 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index cbb2173f78..6cb3f36630 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -24,6 +24,7 @@ Release Notes.
* Polish up activemq plugin to fix missing broker tag on consumer side
* Enhance MQ plugin relative tests to check key tags not blank.
* Add RocketMQ test scenarios for version 4.3 - 4.9. No 4.0 - 4.2 release images for testing.
+* Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span.
#### Documentation
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java
index 61b35544b4..a7269b9540 100644
--- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java
@@ -20,16 +20,19 @@ package org.apache.skywalking.apm.plugin.rocketMQ.v4;
import java.lang.reflect.Method;
import java.util.List;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumerEnhanceInfos;
/**
* {@link AbstractMessageConsumeInterceptor} create entry span when the <code>consumeMessage</code> in the {@link
@@ -48,13 +51,23 @@ public abstract class AbstractMessageConsumeInterceptor implements InstanceMetho
ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0));
AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + msgs.get(0)
.getTopic() + "/Consumer", contextCarrier);
-
+ Tags.MQ_TOPIC.set(span, msgs.get(0).getTopic());
+ if (msgs.get(0).getStoreHost() != null) {
+ String brokerAddress = msgs.get(0).getStoreHost().toString();
+ brokerAddress = StringUtils.removeStart(brokerAddress, "/");
+ Tags.MQ_BROKER.set(span, brokerAddress);
+ }
span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
SpanLayer.asMQ(span);
for (int i = 1; i < msgs.size(); i++) {
ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
}
+ Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
+ if (skyWalkingDynamicField != null) {
+ ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos) skyWalkingDynamicField;
+ span.setPeer(consumerEnhanceInfos.getNamesrvAddr());
+ }
}
@Override
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/RegisterMessageListenerInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/RegisterMessageListenerInterceptor.java
new file mode 100644
index 0000000000..8a29665390
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/RegisterMessageListenerInterceptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v4;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumerEnhanceInfos;
+
+import java.lang.reflect.Method;
+
+public class RegisterMessageListenerInterceptor implements InstanceMethodsAroundInterceptor {
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+ DefaultMQPushConsumer defaultMQPushConsumer = (DefaultMQPushConsumer) objInst;
+ String namesrvAddr = defaultMQPushConsumer.getNamesrvAddr();
+ ConsumerEnhanceInfos consumerEnhanceInfos = new ConsumerEnhanceInfos(namesrvAddr);
+
+ if (allArguments[0] instanceof EnhancedInstance) {
+ EnhancedInstance enhancedMessageListener = (EnhancedInstance) allArguments[0];
+ enhancedMessageListener.setSkyWalkingDynamicField(consumerEnhanceInfos);
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumerEnhanceInfos.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumerEnhanceInfos.java
new file mode 100644
index 0000000000..a58eac219f
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumerEnhanceInfos.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
+
+public class ConsumerEnhanceInfos {
+
+ private String namesrvAddr;
+
+ public ConsumerEnhanceInfos(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java
new file mode 100644
index 0000000000..d7b8ccdb82
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class DefaultMQPushConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPushConsumer";
+ private static final String REGISTER_MESSAGE_LISTENER_METHOD_NAME = "registerMessageListener";
+ public static final String REGISTER_MESSAGE_LISTENER_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v4.RegisterMessageListenerInterceptor";
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription> getMethodsMatcher() {
+ return named(REGISTER_MESSAGE_LISTENER_METHOD_NAME);
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return REGISTER_MESSAGE_LISTENER_INTERCEPT_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def
index 8fabe1be93..3558088167 100644
--- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def
@@ -18,3 +18,4 @@ rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageC
rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageOrderlyInstrumentation
rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.MQClientAPIImplInstrumentation
rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.SendCallbackInstrumentation
+rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.DefaultMQPushConsumerInstrumentation
diff --git a/test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml b/test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml
index 7991de68f5..3d669b5f2d 100644
--- a/test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml
@@ -59,9 +59,11 @@ segmentItems:
componentId: 39
isError: false
spanType: Entry
- peer: ''
+ peer: not blank
tags:
- {key: transmission.latency, value: not null}
+ - {key: mq.topic, value: TopicTest }
+ - {key: mq.broker, value: not blank }
refs:
- {parentEndpoint: GET:/case/rocketmq-scenario, networkAddress: not null,
refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,