You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2019/01/14 14:27:32 UTC

[incubator-skywalking] Diff for: [GitHub] wu-sheng closed pull request #2104: Support spring-rabbit plugin for 1.x and 2.x

diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml
index 13a4c4b02c..4b270c632e 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -63,6 +63,7 @@
 	<module>rabbitmq-5.x-plugin</module>
         <module>dubbo-conflict-patch</module>
         <module>canal-1.x-plugin</module>
+        <module>spring-rabbit-plugin</module>
     </modules>
     <packaging>pom</packaging>
 
diff --git a/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/pom.xml
new file mode 100644
index 0000000000..d330348b91
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>apm-sdk-plugin</artifactId>
+        <groupId>org.apache.skywalking</groupId>
+        <version>6.0.0-GA-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>apm-spring-rabbit-plugin</artifactId>
+    <name>spring-rabbit-plugin</name>
+    <packaging>jar</packaging>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <spring.rabbit>[2.1.2.RELEASE,1.7.11.RELEASE]</spring.rabbit>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.amqp</groupId>
+            <artifactId>spring-rabbit</artifactId>
+            <version>${spring.rabbit}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.amqp</groupId>
+            <artifactId>spring-rabbit</artifactId>
+            <version>2.1.2.RELEASE</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/java/org/apache/skywalking/apm/plugin/springrabbit/RabbitListenerInterceptor.java b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/java/org/apache/skywalking/apm/plugin/springrabbit/RabbitListenerInterceptor.java
new file mode 100644
index 0000000000..f26ee65812
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/java/org/apache/skywalking/apm/plugin/springrabbit/RabbitListenerInterceptor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.skywalking.apm.plugin.springrabbit;
+
+import com.rabbitmq.client.Channel;
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.springframework.amqp.core.Message;
+
+/**
+ * {@link RabbitListenerInterceptor} create a local span and set the operation name which fetch from
+ * <code>org.apache.skywalking.apm.toolkit.trace.annotation.Trace.operationName</code>. if the fetch value is blank string, and
+ * the operation name will be the method name.
+ *
+ * @author jjlu521016@gmail.com
+ */
+public class RabbitListenerInterceptor implements InstanceMethodsAroundInterceptor {
+    public static final String OPERATE_NAME_PREFIX = "RabbitMQ/";
+    public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
+
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+                             MethodInterceptResult result) throws Throwable {
+        ContextCarrier contextCarrier = new ContextCarrier();
+        Channel channel = (Channel) allArguments[0];
+        Message message = (Message) allArguments[1];
+        String topic = message.getMessageProperties().getReceivedExchange();
+        String queue = message.getMessageProperties().getReceivedRoutingKey();
+        String url = channel.getConnection().getAddress().getHostAddress() + channel.getConnection().getPort();
+
+        AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + "Topic/" + topic + "/Queue/" + queue + CONSUMER_OPERATE_NAME_SUFFIX, null).start(System.currentTimeMillis());
+        Tags.MQ_BROKER.set(activeSpan, url);
+        Tags.MQ_TOPIC.set(activeSpan, topic);
+        Tags.MQ_QUEUE.set(activeSpan, queue);
+        activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
+        SpanLayer.asMQ(activeSpan);
+        CarrierItem next = contextCarrier.items();
+        while (next.hasNext()) {
+            next = next.next();
+            next.setHeadValue(message.getMessageProperties().getHeaders().get(next.getHeadKey()).toString());
+        }
+        ContextManager.extract(contextCarrier);
+
+    }
+
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+                              Object ret) throws Throwable {
+        ContextManager.stopSpan();
+        return ret;
+    }
+
+    @Override
+    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+                                      Class<?>[] argumentsTypes, Throwable t) {
+        ContextManager.activeSpan().errorOccurred().log(t);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/java/org/apache/skywalking/apm/plugin/springrabbit/RabbitMQSendInterceptor.java b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/java/org/apache/skywalking/apm/plugin/springrabbit/RabbitMQSendInterceptor.java
new file mode 100644
index 0000000000..049b18b449
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/java/org/apache/skywalking/apm/plugin/springrabbit/RabbitMQSendInterceptor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.apm.plugin.springrabbit;
+
+import com.rabbitmq.client.Channel;
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.springframework.amqp.core.Message;
+
+/**
+ * @author jjlu521016@gmail.com
+ */
+public class RabbitMQSendInterceptor implements InstanceMethodsAroundInterceptor {
+    public static final String OPERATE_NAME_PREFIX = "RabbitMQ/";
+    public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer";
+
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+        MethodInterceptResult result) throws Throwable {
+        ContextCarrier contextCarrier = new ContextCarrier();
+        Channel channel = (Channel)allArguments[0];
+        Message message = (Message)allArguments[3];
+
+        String exChangeName = (String)allArguments[1];
+        String queueName = (String)allArguments[2];
+        String url = channel.getConnection().getAddress().getHostAddress() + ":" + channel.getConnection().getPort();
+        AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + "Topic/" + exChangeName + "/Queue/" + queueName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, url);
+        Tags.MQ_BROKER.set(activeSpan, url);
+        Tags.MQ_QUEUE.set(activeSpan, queueName);
+        Tags.MQ_TOPIC.set(activeSpan, exChangeName);
+        SpanLayer.asMQ(activeSpan);
+        activeSpan.setComponent(ComponentsDefine.RABBITMQ_PRODUCER);
+        CarrierItem next = contextCarrier.items();
+
+        while (next.hasNext()) {
+            next = next.next();
+            // put header info to message
+            message.getMessageProperties().setHeader(next.getHeadKey(), next.getHeadValue());
+        }
+        allArguments[3] = message;
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+        Object ret) throws Throwable {
+        ContextManager.stopSpan();
+        return ret;
+    }
+
+    @Override
+    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+        Class<?>[] argumentsTypes, Throwable t) {
+        ContextManager.activeSpan().errorOccurred().log(t);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/java/org/apache/skywalking/apm/plugin/springrabbit/define/RabbitMQListenerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/java/org/apache/skywalking/apm/plugin/springrabbit/define/RabbitMQListenerInstrumentation.java
new file mode 100644
index 0000000000..d3c253a82f
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/java/org/apache/skywalking/apm/plugin/springrabbit/define/RabbitMQListenerInstrumentation.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.apm.plugin.springrabbit.define;
+
+import com.rabbitmq.client.Channel;
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
+import org.apache.skywalking.apm.plugin.springrabbit.RabbitListenerInterceptor;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+
+/**
+ * @author jjlu521016@gmail.com
+ * for spring-rabbit 2.x {@link AbstractMessageListenerContainer#actualInvokeListener(Channel,Message)};
+ * for spring-rabbit 1.x {@link AbstractMessageListenerContainer#invokeListener(Channel, Message)};
+ */
+public class RabbitMQListenerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+    /**
+     * {@link RabbitListenerInterceptor}
+     */
+    public static final String INTERCEPTOR_TLISTENER_CLASS = "org.apache.skywalking.apm.plugin.springrabbit.RabbitListenerInterceptor";
+    public static final String ENHANCE_CLASS_COMSUMER = "org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer";
+    // spring-rabbit 2.x method
+    public static final String ACTUALINVOKELISTENER_METHOD = "actualInvokeListener";
+    // spring-rabbit 1.x method
+    public static final String INVOKELISTENER_METHOD = "invokeListener";
+
+    @Override
+    protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[0];
+    }
+
+    @Override
+    protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[]{
+            InstanceMethodsInterceptPoint(ACTUALINVOKELISTENER_METHOD),
+            //for spring-rabbit 1.x ,if not use this ,can't aspect !!
+            InstanceMethodsInterceptPoint(INVOKELISTENER_METHOD),
+        };
+    }
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_COMSUMER);
+    }
+
+    /**
+     * gen InstanceMethodsInterceptPoint object
+     * @param methodName
+     * @return
+     */
+    private InstanceMethodsInterceptPoint InstanceMethodsInterceptPoint(final String methodName) {
+        return  new InstanceMethodsInterceptPoint() {
+            @Override
+            public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                return named(methodName)
+                        .and(takesArgumentWithType(0, "com.rabbitmq.client.Channel"))
+                        .and(takesArgumentWithType(1, "org.springframework.amqp.core.Message"));
+            }
+
+            @Override
+            public String getMethodsInterceptor() {
+                return INTERCEPTOR_TLISTENER_CLASS;
+            }
+
+            @Override
+            public boolean isOverrideArgs() {
+                return true;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/java/org/apache/skywalking/apm/plugin/springrabbit/define/RabbitMQSendInstrumentation.java b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/java/org/apache/skywalking/apm/plugin/springrabbit/define/RabbitMQSendInstrumentation.java
new file mode 100644
index 0000000000..80bc880a6e
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/java/org/apache/skywalking/apm/plugin/springrabbit/define/RabbitMQSendInstrumentation.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.apm.plugin.springrabbit.define;
+
+import com.rabbitmq.client.Channel;
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+/**
+ * @author jjlu521016@gmail.com
+ */
+public class RabbitMQSendInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.springrabbit.RabbitMQSendInterceptor";
+    public static final String ENHANCE_CLASS_PRODUCER = "org.springframework.amqp.rabbit.core.RabbitTemplate";
+    /**
+     * {@link RabbitTemplate#doSend(Channel, String, String, Message, boolean, CorrelationData)}
+     */
+    public static final String ENHANCE_METHOD_DISPATCH = "doSend";
+
+    @Override
+    protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[0];
+    }
+
+    @Override
+    protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(ENHANCE_METHOD_DISPATCH).and(takesArguments(6));
+                }
+
+                @Override
+                public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override
+                public boolean isOverrideArgs() {
+                    return true;
+                }
+            }
+        };
+    }
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_PRODUCER);
+    }
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/resources/skywalking-plugin.def
new file mode 100644
index 0000000000..691840caa6
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spring-rabbit=org.apache.skywalking.apm.plugin.springrabbit.define.RabbitMQSendInstrumentation
+spring-rabbit=org.apache.skywalking.apm.plugin.springrabbit.define.RabbitMQListenerInstrumentation
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/test/java/org/apache/skywalking/apm/plugin/springrabbit/RabbitListenerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/test/java/org/apache/skywalking/apm/plugin/springrabbit/RabbitListenerInterceptorTest.java
new file mode 100644
index 0000000000..181c6b1a0a
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/test/java/org/apache/skywalking/apm/plugin/springrabbit/RabbitListenerInterceptorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.skywalking.apm.plugin.springrabbit;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageProperties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * @author jjlu521016@gmail.com
+ */
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class RabbitListenerInterceptorTest {
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+
+    private RabbitListenerInterceptor rabbitListenerInterceptor;
+
+    private Object[] arguments;
+
+    @Before
+    public void setUp() throws Exception {
+        rabbitListenerInterceptor = new RabbitListenerInterceptor();
+        arguments = buildArgs();
+    }
+
+    @Test
+    public void TestRabbitMQConsumerInterceptor() throws Throwable {
+        rabbitListenerInterceptor.beforeMethod(null, null, arguments, null, null);
+        rabbitListenerInterceptor.afterMethod(null, null, arguments, null, null);
+        List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
+        Assert.assertThat(traceSegments.size(), is(1));
+    }
+
+    public Object[] buildArgs() {
+
+        Connection connection = mock(Connection.class);
+        when(connection.getId()).thenReturn("connection-1");
+        when(connection.getAddress()).thenReturn(getAddress());
+        when(connection.getPort()).thenReturn(5672);
+        Channel channel = mock(Channel.class);
+        when(channel.getConnection()).thenReturn(connection);
+        when(channel.getChannelNumber()).thenReturn(1);
+
+        byte[] body = "Test message".getBytes();
+        MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setHeader("test", "myheader1");
+        messageProperties.setHeader("sw6", "1-NTguOTEuMTU0NjE4MTM2NDQ4NTAwMDE=-NTcuNTYuMTU0NjE4MTM2NDYzNTAwMDA=-1-57-58-IzEyNy4wLjAuMTo1Njcy-Iy9oYWhh-Iy9oYWhh");
+        return new Object[] {channel, new Message(body, messageProperties)};
+    }
+
+    public InetAddress getAddress() {
+        try {
+            return InetAddress.getByName("127.0.0.1");
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+            return null;
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/test/java/org/apache/skywalking/apm/plugin/springrabbit/RabbitMQSendInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/test/java/org/apache/skywalking/apm/plugin/springrabbit/RabbitMQSendInterceptorTest.java
new file mode 100644
index 0000000000..277958e1eb
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/spring-rabbit-plugin/src/test/java/org/apache/skywalking/apm/plugin/springrabbit/RabbitMQSendInterceptorTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.apm.plugin.springrabbit;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageProperties;
+
+import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.RABBITMQ_PRODUCER;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * @author jjlu521016@gmail.com
+ */
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class RabbitMQSendInterceptorTest {
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+
+    private RabbitMQSendInterceptor rabbitMQSendInterceptor;
+
+    private Object[] arguments;
+
+    @Before
+    public void setUp() throws Exception {
+        rabbitMQSendInterceptor = new RabbitMQSendInterceptor();
+
+        arguments = buildArgs();
+    }
+
+    @Test
+    public void TestRabbitMQProducerInterceptor() throws Throwable {
+        rabbitMQSendInterceptor.beforeMethod(null, null, arguments, null, null);
+        rabbitMQSendInterceptor.afterMethod(null, null, arguments, null, null);
+
+        List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
+        assertThat(traceSegmentList.size(), is(1));
+
+        TraceSegment segment = traceSegmentList.get(0);
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
+        assertThat(spans.size(), is(1));
+        assertRabbitMQSpan(spans.get(0));
+    }
+
+    private void assertRabbitMQSpan(AbstractTracingSpan span) {
+        SpanAssert.assertTag(span, 0, "127.0.0.1:5672");
+        SpanAssert.assertTag(span, 1, "spring-rabbit-test");
+        SpanAssert.assertComponent(span, RABBITMQ_PRODUCER);
+        SpanAssert.assertLayer(span, SpanLayer.MQ);
+        assertThat(span.getOperationName(), is("RabbitMQ/Topic/topic-1/Queue/spring-rabbit-test/Producer"));
+    }
+
+    public Object[] buildArgs() {
+
+        Connection connection = mock(Connection.class);
+        when(connection.getId()).thenReturn("connection-1");
+        when(connection.getAddress()).thenReturn(getAddress());
+        when(connection.getPort()).thenReturn(5672);
+        Channel channel = mock(Channel.class);
+        when(channel.getConnection()).thenReturn(connection);
+        when(channel.getChannelNumber()).thenReturn(1);
+
+        byte[] body = "Test message".getBytes();
+        MessageProperties messageProperties = new MessageProperties();
+        messageProperties.setHeader("test", "myheader1");
+        return new Object[] {channel, "topic-1", "spring-rabbit-test", new Message(body, messageProperties), true, null};
+    }
+
+    public InetAddress getAddress() {
+        try {
+            return InetAddress.getByName("127.0.0.1");
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+            return null;
+        }
+    }
+}
\ No newline at end of file


With regards,
Apache Git Services