You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/11/02 23:04:26 UTC

[skywalking-java] branch main updated: Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span (#369)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git


The following commit(s) were added to refs/heads/main by this push:
     new 6648e450b2 Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span (#369)
6648e450b2 is described below

commit 6648e450b20a66005321c730931fd249de85826b
Author: zhyyu <zh...@gmail.com>
AuthorDate: Thu Nov 3 07:04:20 2022 +0800

    Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span (#369)
    
    * add rocketmq consumer tag
    
    * update CHANGES.md
    
    * Update CHANGES.md
    
    * change consumer tag not null to not blank
    
    * set rocketmq consumer span's peer to namesrv
    
    Co-authored-by: yuzhongyu <yu...@cestc.cn>
    Co-authored-by: 吴晟 Wu Sheng <wu...@foxmail.com>
---
 CHANGES.md                                         |  1 +
 .../v4/AbstractMessageConsumeInterceptor.java      | 15 ++++-
 .../v4/RegisterMessageListenerInterceptor.java     | 51 ++++++++++++++++
 .../rocketMQ/v4/define/ConsumerEnhanceInfos.java   | 32 ++++++++++
 .../DefaultMQPushConsumerInstrumentation.java      | 69 ++++++++++++++++++++++
 .../src/main/resources/skywalking-plugin.def       |  1 +
 .../rocketmq-scenario/config/expectedData.yaml     |  4 +-
 7 files changed, 171 insertions(+), 2 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index cbb2173f78..6cb3f36630 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -24,6 +24,7 @@ Release Notes.
 * Polish up activemq plugin to fix missing broker tag on consumer side
 * Enhance MQ plugin relative tests to check key tags not blank.
 * Add RocketMQ test scenarios for version 4.3 - 4.9. No 4.0 - 4.2 release images for testing.
+* Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span. 
 
 #### Documentation
 
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java
index 61b35544b4..a7269b9540 100644
--- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java
@@ -20,16 +20,19 @@ package org.apache.skywalking.apm.plugin.rocketMQ.v4;
 
 import java.lang.reflect.Method;
 import java.util.List;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.skywalking.apm.agent.core.context.CarrierItem;
 import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
 import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
 import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
 import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumerEnhanceInfos;
 
 /**
  * {@link AbstractMessageConsumeInterceptor} create entry span when the <code>consumeMessage</code> in the {@link
@@ -48,13 +51,23 @@ public abstract class AbstractMessageConsumeInterceptor implements InstanceMetho
         ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0));
         AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + msgs.get(0)
                                                                                                 .getTopic() + "/Consumer", contextCarrier);
-
+        Tags.MQ_TOPIC.set(span, msgs.get(0).getTopic());
+        if (msgs.get(0).getStoreHost() != null) {
+            String brokerAddress = msgs.get(0).getStoreHost().toString();
+            brokerAddress = StringUtils.removeStart(brokerAddress, "/");
+            Tags.MQ_BROKER.set(span, brokerAddress);
+        }
         span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
         SpanLayer.asMQ(span);
         for (int i = 1; i < msgs.size(); i++) {
             ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
         }
 
+        Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
+        if (skyWalkingDynamicField != null) {
+            ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos) skyWalkingDynamicField;
+            span.setPeer(consumerEnhanceInfos.getNamesrvAddr());
+        }
     }
 
     @Override
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/RegisterMessageListenerInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/RegisterMessageListenerInterceptor.java
new file mode 100644
index 0000000000..8a29665390
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/RegisterMessageListenerInterceptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v4;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumerEnhanceInfos;
+
+import java.lang.reflect.Method;
+
+public class RegisterMessageListenerInterceptor implements InstanceMethodsAroundInterceptor {
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+        DefaultMQPushConsumer defaultMQPushConsumer = (DefaultMQPushConsumer) objInst;
+        String namesrvAddr = defaultMQPushConsumer.getNamesrvAddr();
+        ConsumerEnhanceInfos consumerEnhanceInfos = new ConsumerEnhanceInfos(namesrvAddr);
+
+        if (allArguments[0] instanceof EnhancedInstance) {
+            EnhancedInstance enhancedMessageListener = (EnhancedInstance) allArguments[0];
+            enhancedMessageListener.setSkyWalkingDynamicField(consumerEnhanceInfos);
+        }
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+        return ret;
+    }
+
+    @Override
+    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+        
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumerEnhanceInfos.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumerEnhanceInfos.java
new file mode 100644
index 0000000000..a58eac219f
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumerEnhanceInfos.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
+
+public class ConsumerEnhanceInfos {
+    
+    private String namesrvAddr;
+
+    public ConsumerEnhanceInfos(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
+    }
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java
new file mode 100644
index 0000000000..d7b8ccdb82
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class DefaultMQPushConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+    private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPushConsumer";
+    private static final String REGISTER_MESSAGE_LISTENER_METHOD_NAME = "registerMessageListener";
+    public static final String REGISTER_MESSAGE_LISTENER_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v4.RegisterMessageListenerInterceptor";
+
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[0];
+    }
+
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[] {
+                new InstanceMethodsInterceptPoint() {
+                    @Override
+                    public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                        return named(REGISTER_MESSAGE_LISTENER_METHOD_NAME);
+                    }
+
+                    @Override
+                    public String getMethodsInterceptor() {
+                        return REGISTER_MESSAGE_LISTENER_INTERCEPT_CLASS;
+                    }
+
+                    @Override
+                    public boolean isOverrideArgs() {
+                        return false;
+                    }
+                }
+        };
+    }
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+    
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def
index 8fabe1be93..3558088167 100644
--- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def
@@ -18,3 +18,4 @@ rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageC
 rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageOrderlyInstrumentation
 rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.MQClientAPIImplInstrumentation
 rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.SendCallbackInstrumentation
+rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.DefaultMQPushConsumerInstrumentation
diff --git a/test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml b/test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml
index 7991de68f5..3d669b5f2d 100644
--- a/test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml
@@ -59,9 +59,11 @@ segmentItems:
             componentId: 39
             isError: false
             spanType: Entry
-            peer: ''
+            peer: not blank
             tags:
               - {key: transmission.latency, value: not null}
+              - {key: mq.topic, value: TopicTest }
+              - {key: mq.broker, value: not blank }
             refs:
               - {parentEndpoint: GET:/case/rocketmq-scenario, networkAddress: not null,
                  refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,