You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/09/25 05:25:00 UTC

[skywalking] branch master updated: Add pulsar apm plugin (#3476)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new a874251  Add pulsar apm plugin (#3476)
a874251 is described below

commit a87425147c315ca155c382f5c0c55b569ad6fd9b
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Sep 25 13:24:54 2019 +0800

    Add pulsar apm plugin (#3476)
    
    * Add pulsar apm plugin.
    
    * Fix check style
    
    * Fix pulsar consumer component define.
    
    * Add pulsar to component-libraries.yml
    
    * Fix error interceptor class.
    
    * Add pulsar to agent support list.
    
    * Add Pulsar to ComponentsDefine and component-libraries.yml
    
    * Move create entry span log of consumer from after method to before method
    
    * Fix send callback issue when exception cause.
    
    * Fix test issues
    
    * Move pulsar plugin to optional plugins
    
    * Add none messages tests for interceptor of producer and consumer.
    
    * Remove unused comments.
    
    * Move pulsar plugin back to the apm-sdk-plugin
    
    * Fix comments
    
    * remove set startTime for entry span(default is set by System.currentTimeMillis)
    
    * Fix comments
---
 .../network/trace/component/ComponentsDefine.java  |   5 +-
 apm-sniffer/apm-sdk-plugin/pom.xml                 |   1 +
 apm-sniffer/apm-sdk-plugin/pulsar-plugin/pom.xml   |  42 ++++++
 .../pulsar/ConsumerConstructorInterceptor.java     |  52 ++++++++
 .../plugin/pulsar/ConsumerEnhanceRequiredInfo.java |  67 ++++++++++
 .../pulsar/ProducerConstructorInterceptor.java     |  49 +++++++
 .../plugin/pulsar/ProducerEnhanceRequiredInfo.java |  53 ++++++++
 .../plugin/pulsar/PulsarConsumerInterceptor.java   |  91 +++++++++++++
 .../plugin/pulsar/PulsarProducerInterceptor.java   | 110 ++++++++++++++++
 .../pulsar/SendCallbackEnhanceRequiredInfo.java    |  56 ++++++++
 .../apm/plugin/pulsar/SendCallbackInterceptor.java |  84 ++++++++++++
 .../define/PulsarConsumerInstrumentation.java      |  95 ++++++++++++++
 .../define/PulsarProducerInstrumentation.java      | 105 +++++++++++++++
 .../pulsar/define/SendCallbackInstrumentation.java |  75 +++++++++++
 .../src/main/resources/skywalking-plugin.def       |  19 +++
 .../pulsar/ConsumerConstructorInterceptorTest.java |  82 ++++++++++++
 .../skywalking/apm/plugin/pulsar/MockMessage.java  |  69 ++++++++++
 .../pulsar/ProducerConstructorInterceptorTest.java |  75 +++++++++++
 .../pulsar/PulsarConsumerInterceptorTest.java      | 136 +++++++++++++++++++
 .../pulsar/PulsarProducerInterceptorTest.java      | 114 ++++++++++++++++
 .../plugin/pulsar/SendCallbackInterceptorTest.java | 144 +++++++++++++++++++++
 .../service-agent/java-agent/Supported-list.md     |   1 +
 .../src/main/resources/component-libraries.yml     |  14 +-
 23 files changed, 1536 insertions(+), 3 deletions(-)

diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
index c79b6ea..7bb53c8 100755
--- a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
+++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
@@ -133,7 +133,10 @@ public class ComponentsDefine {
     public static final OfficialComponent PLAY = new OfficialComponent(68, "Play");
 
     public static final OfficialComponent CASSANDRA_JAVA_DRIVER = new OfficialComponent(69, "cassandra-java-driver");
-
+  
     public static final OfficialComponent LIGHT_4J = new OfficialComponent(71, "Light4J");
 
+    public static final OfficialComponent PULSAR_PRODUCER = new OfficialComponent(73, "pulsar-producer");
+
+    public static final OfficialComponent PULSAR_CONSUMER = new OfficialComponent(74, "pulsar-consumer");
 }
diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml
index 886dadb..d38c4bc 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -76,6 +76,7 @@
         <module>solrj-7.x-plugin</module>
         <module>cassandra-java-driver-3.x-plugin</module>
         <module>light4j-plugins</module>
+        <module>pulsar-plugin</module>
     </modules>
     <packaging>pom</packaging>
 
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/pom.xml
new file mode 100644
index 0000000..bbb9e9e
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>apm-sdk-plugin</artifactId>
+        <groupId>org.apache.skywalking</groupId>
+        <version>6.5.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>apm-pulsar-plugin</artifactId>
+
+    <properties>
+        <pulsar.version>2.4.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client</artifactId>
+            <version>${pulsar.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
new file mode 100644
index 0000000..8965c93
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+/**
+ * Interceptor of pulsar consumer constructor.
+ *
+ * The interceptor create {@link ConsumerEnhanceRequiredInfo} which is required by instance method interceptor,
+ * So use it to update the skywalking dynamic field of pulsar consumer enhanced instance.
+ * So that the instance methods can get the {@link ConsumerEnhanceRequiredInfo}
+ *
+ * @author penghui
+ */
+public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
+
+    @Override
+    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+        PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0];
+        String topic = (String) allArguments[1];
+        ConsumerConfigurationData consumerConfigurationData = (ConsumerConfigurationData) allArguments[2];
+        ConsumerEnhanceRequiredInfo requireInfo = new ConsumerEnhanceRequiredInfo();
+        /*
+         * Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl()
+         * can handle the service url provider which use a dynamic service url
+         */
+        requireInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
+        requireInfo.setTopic(topic);
+        requireInfo.setSubscriptionName(consumerConfigurationData.getSubscriptionName());
+        objInst.setSkyWalkingDynamicField(requireInfo);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
new file mode 100644
index 0000000..a1276d8
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+/**
+ * Pulsar consumer enhance required info is required by consumer enhanced object method interceptor
+ *
+ * @author penghui
+ */
+public class ConsumerEnhanceRequiredInfo {
+
+    /**
+     * service url of the consumer
+     */
+    private String serviceUrl;
+
+    /**
+     * topic name of the consumer
+     */
+    private String topic;
+
+    /**
+     * subscription name of the consumer
+     */
+    private String subscriptionName;
+
+    public String getServiceUrl() {
+        return serviceUrl;
+    }
+
+    public void setServiceUrl(String serviceUrl) {
+        this.serviceUrl = serviceUrl;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public void setSubscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptor.java
new file mode 100644
index 0000000..8db8120
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+/**
+ * Interceptor of pulsar producer constructor.
+ *
+ * The interceptor create {@link ProducerEnhanceRequiredInfo} which is required by instance method interceptor,
+ * So use it to update the skywalking dynamic field of pulsar producer enhanced instance.
+ * So that the instance methods can get the {@link ProducerEnhanceRequiredInfo}
+ *
+ * @author penghui
+ */
+public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {
+
+    @Override
+    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+        PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0];
+        String topic = (String) allArguments[1];
+        ProducerEnhanceRequiredInfo producerEnhanceRequiredInfo = new ProducerEnhanceRequiredInfo();
+        producerEnhanceRequiredInfo.setTopic(topic);
+        /*
+         * Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl()
+         * can handle the service url provider which use a dynamic service url
+         */
+        producerEnhanceRequiredInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
+        objInst.setSkyWalkingDynamicField(producerEnhanceRequiredInfo);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerEnhanceRequiredInfo.java
new file mode 100644
index 0000000..210e66a
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerEnhanceRequiredInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+/**
+ * Pulsar producer enhance required info is required by producer enhanced object method interceptor
+ *
+ * @author penghui
+ */
+public class ProducerEnhanceRequiredInfo {
+
+    /**
+     * service url of the pulsar producer
+     */
+    private String serviceUrl;
+
+    /**
+     * topic name of the pulsar producer
+     */
+    private String topic;
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getServiceUrl() {
+        return serviceUrl;
+    }
+
+    public void setServiceUrl(String serviceUrl) {
+        this.serviceUrl = serviceUrl;
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
new file mode 100644
index 0000000..61ebe04
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+import java.lang.reflect.Method;
+
+/**
+ * Interceptor for pulsar consumer enhanced instance
+ *
+ * Here is the intercept process steps:
+ *
+ * <pre>
+ *  1. Get the @{@link ConsumerEnhanceRequiredInfo} and record the service url, topic name and subscription name
+ *  2. Create the entry span when call <code>messageProcessed</code> method
+ *  3. Extract all the <code>Trace Context</code> when call <code>messageProcessed</code> method
+ *  4. Stop the entry span when <code>messageProcessed</code> method finished.
+ * </pre>
+ *
+ * @author penghui
+ */
+public class PulsarConsumerInterceptor implements InstanceMethodsAroundInterceptor {
+
+    public static final String OPERATE_NAME_PREFIX = "Pulsar/";
+    public static final String CONSUMER_OPERATE_NAME = "/Consumer/";
+
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+                             MethodInterceptResult result) throws Throwable {
+        if (allArguments[0] != null) {
+            ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
+            Message msg = (Message) allArguments[0];
+            ContextCarrier carrier = new ContextCarrier();
+            CarrierItem next = carrier.items();
+            while (next.hasNext()) {
+                next = next.next();
+                next.setHeadValue(msg.getProperty(next.getHeadKey()));
+            }
+            AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX +
+                    requiredInfo.getTopic() + CONSUMER_OPERATE_NAME + requiredInfo.getSubscriptionName(), carrier);
+            activeSpan.setComponent(ComponentsDefine.PULSAR_CONSUMER);
+            SpanLayer.asMQ(activeSpan);
+            Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl());
+            Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopic());
+        }
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+                              Object ret) throws Throwable {
+        if (allArguments[0] != null) {
+            ContextManager.stopSpan();
+        }
+        return ret;
+    }
+
+    @Override
+    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+                                      Class<?>[] argumentsTypes, Throwable t) {
+        if (allArguments[0] != null) {
+            ContextManager.activeSpan().errorOccurred().log(t);
+        }
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java
new file mode 100644
index 0000000..3205973
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+import java.lang.reflect.Method;
+
+/**
+ * Interceptor for pulsar producer enhanced instance.
+ *
+ * Here is the intercept process steps:
+ *
+ * <pre>
+ *  1. Get the {@link ProducerEnhanceRequiredInfo} and record the service url, topic name
+ *  2. Create the exit span when the producer invoke <code>sendAsync</code> method
+ *  3. Inject the context to {@link Message#getProperties()}
+ *  4. Create {@link SendCallbackEnhanceRequiredInfo} with <code>ContextManager.capture()</code> and set the
+ *     callback enhanced instance skywalking dynamic field to the created required info.
+ *  5. Stop the exit span when <code>sendAsync</code> method finished.
+ * </pre>
+ *
+ * @author penghui
+ */
+public class PulsarProducerInterceptor implements InstanceMethodsAroundInterceptor {
+
+    public static final String OPERATE_NAME_PREFIX = "Pulsar/";
+    public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer";
+
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+                             MethodInterceptResult result) throws Throwable {
+        if (allArguments[0] != null) {
+            ProducerEnhanceRequiredInfo requiredInfo = (ProducerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
+            ContextCarrier contextCarrier = new ContextCarrier();
+            String topicName = requiredInfo.getTopic();
+            AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName +
+                    PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, requiredInfo.getServiceUrl());
+            Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl());
+            Tags.MQ_TOPIC.set(activeSpan, topicName);
+            SpanLayer.asMQ(activeSpan);
+            activeSpan.setComponent(ComponentsDefine.PULSAR_PRODUCER);
+            CarrierItem next = contextCarrier.items();
+            MessageImpl msg = (MessageImpl) allArguments[0];
+            while (next.hasNext()) {
+                next = next.next();
+                msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder()
+                        .setKey(next.getHeadKey())
+                        .setValue(next.getHeadValue()));
+            }
+            if (allArguments.length > 1) {
+                EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1];
+                if (callbackInstance != null) {
+                    ContextSnapshot snapshot = ContextManager.capture();
+                    if (null != snapshot) {
+                        SendCallbackEnhanceRequiredInfo callbackRequiredInfo = new SendCallbackEnhanceRequiredInfo();
+                        callbackRequiredInfo.setTopic(topicName);
+                        callbackRequiredInfo.setContextSnapshot(snapshot);
+                        callbackInstance.setSkyWalkingDynamicField(callbackRequiredInfo);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+                              Object ret) throws Throwable {
+        if (allArguments[0] != null) {
+            ContextManager.stopSpan();
+        }
+        return ret;
+    }
+
+    @Override
+    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+        if (allArguments[0] != null) {
+            ContextManager.activeSpan().errorOccurred().log(t);
+        }
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackEnhanceRequiredInfo.java
new file mode 100644
index 0000000..06b0c8f
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackEnhanceRequiredInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+
+/**
+ * Pulsar {@link org.apache.pulsar.client.impl.SendCallback} enhance required info is required by
+ * <code>SendCallback</code> enhanced object method interceptor
+ *
+ * @author penghui
+ */
+public class SendCallbackEnhanceRequiredInfo {
+
+    /**
+     * topic name of the producer
+     */
+    private String topic;
+
+    /**
+     * context snapshot
+     */
+    ContextSnapshot contextSnapshot;
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public ContextSnapshot getContextSnapshot() {
+        return contextSnapshot;
+    }
+
+    public void setContextSnapshot(ContextSnapshot contextSnapshot) {
+        this.contextSnapshot = contextSnapshot;
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptor.java
new file mode 100644
index 0000000..f5573ef
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+import java.lang.reflect.Method;
+
+/**
+ * Interceptor for send callback enhanced instance.
+ *
+ * Here is the intercept process steps:
+ *
+ * <pre>
+ *  1. Get the @{@link SendCallbackEnhanceRequiredInfo} and record the service url, context snapshot
+ *  2. Create the local span when the callback invoke <code>sendComplete</code> method
+ *  3. Stop the local span when <code>sendComplete</code> method finished.
+ * </pre>
+ *
+ * @author penghui
+ */
+public class SendCallbackInterceptor implements InstanceMethodsAroundInterceptor {
+
+    private static final String OPERATION_NAME = "Pulsar/Producer/Callback";
+
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+                             MethodInterceptResult result) throws Throwable {
+        SendCallbackEnhanceRequiredInfo requiredInfo = (SendCallbackEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
+        if (null != requiredInfo.getContextSnapshot()) {
+            AbstractSpan activeSpan = ContextManager.createLocalSpan(OPERATION_NAME);
+            activeSpan.setComponent(ComponentsDefine.PULSAR_PRODUCER);
+            Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopic());
+            SpanLayer.asMQ(activeSpan);
+            ContextManager.continued(requiredInfo.getContextSnapshot());
+        }
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+                              Object ret) throws Throwable {
+        SendCallbackEnhanceRequiredInfo requiredInfo = (SendCallbackEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
+        if (null != requiredInfo.getContextSnapshot()) {
+            Exception exceptions = (Exception) allArguments[0];
+            if (exceptions != null) {
+                ContextManager.activeSpan().errorOccurred().log(exceptions);
+            }
+            ContextManager.stopSpan();
+        }
+        return ret;
+    }
+
+    @Override
+    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+                                      Class<?>[] argumentsTypes, Throwable t) {
+        SendCallbackEnhanceRequiredInfo requiredInfo = (SendCallbackEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
+        if (null != requiredInfo.getContextSnapshot()) {
+            ContextManager.activeSpan().errorOccurred().log(t);
+        }
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerInstrumentation.java
new file mode 100644
index 0000000..e52a774
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerInstrumentation.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * The pulsar consumer instrumentation use {@link org.apache.pulsar.client.impl.ConsumerImpl} as an enhanced class.
+ * {@link org.apache.pulsar.client.api.Consumer} is a user-oriented interface and the implementations are
+ * {@link org.apache.pulsar.client.impl.ConsumerImpl} and {@link org.apache.pulsar.client.impl.MultiTopicsConsumerImpl}
+ *
+ * The MultiTopicsConsumerImpl is a complex type with multiple ConsumerImpl to support uses receive messages from
+ * multiple topics. As each ConsumerImpl has it's own topic name and it is the initial unit of a single topic
+ * to receiving messages, so use ConsumerImpl as an enhanced class is an effective way.
+ *
+ * Use <code>messageProcessed</code> as the enhanced method since pulsar
+ * consumer has multiple ways to receiving messages such as sync method, async method and listeners.
+ * Method messageProcessed is a basic unit of ConsumerImpl, no matter which way uses uses, messageProcessed will always
+ * record the message receiving.
+ *
+ * @author penghui
+ */
+public class PulsarConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+    public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.pulsar.client.impl.PulsarClientImpl";
+    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.ConsumerConstructorInterceptor";
+    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.PulsarConsumerInterceptor";
+    public static final String ENHANCE_METHOD = "messageProcessed";
+    public static final String ENHANCE_METHOD_TYPE = "org.apache.pulsar.client.api.Message";
+    public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.ConsumerImpl";
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[] {
+            new ConstructorInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
+                    return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
+                }
+
+                @Override public String getConstructorInterceptor() {
+                    return CONSTRUCTOR_INTERCEPTOR_CLASS;
+                }
+            }
+        };
+    }
+
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(ENHANCE_METHOD).and(takesArgumentWithType(0, ENHANCE_METHOD_TYPE));
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarProducerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarProducerInstrumentation.java
new file mode 100644
index 0000000..4a38f8f
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarProducerInstrumentation.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * Pulsar producer instrumentation.
+ *
+ * The pulsar producer instrumentation use {@link org.apache.pulsar.client.impl.ProducerImpl} as an enhanced class.
+ * {@link org.apache.pulsar.client.api.Producer} is a user-oriented interface and the implementations of the Producer
+ * are {@link org.apache.pulsar.client.impl.PartitionedProducerImpl} and {@link org.apache.pulsar.client.impl.ProducerImpl}.
+ *
+ * And the PartitionedProducerImpl is a complex type with multiple ProducerImpl to support uses send messages to
+ * multiple partitions. As each ProducerImpl has it's own topic name and it is the initial unit of a single topic
+ * to send messages, so use ProducerImpl as an enhanced class is an effective way.
+ *
+ * About the enhanced methods, currently use {@link org.apache.pulsar.client.impl.ProducerImpl#sendAsync(
+ * org.apache.pulsar.client.api.Message, org.apache.pulsar.client.impl.SendCallback)} as the enhanced method.
+ * Pulsar provides users with two kinds of methods for sending messages sync methods and async methods. The async method
+ * use {@link java.util.concurrent.CompletableFuture as the method result}, if use this method as the enhanced method
+ * is hard to pass the snapshot of span, because can't ensure that the CompletableFuture is completed after the skywalking
+ * dynamic field was updated. But execution time of sync method will be inaccurate.
+ *
+ * @author penghui
+ */
+public class PulsarProducerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.PulsarProducerInterceptor";
+    public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.ProducerImpl";
+    public static final String ENHANCE_METHOD = "sendAsync";
+    public static final String ENHANCE_METHOD_TYPE = "org.apache.pulsar.client.impl.SendCallback";
+
+    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.ProducerConstructorInterceptor";
+    public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "org.apache.pulsar.client.impl.PulsarClientImpl";
+
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[]{
+            new ConstructorInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> getConstructorMatcher() {
+                    return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
+                }
+
+                @Override
+                public String getConstructorInterceptor() {
+                    return CONSTRUCTOR_INTERCEPTOR_CLASS;
+                }
+            }
+        };
+    }
+
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[]{
+            new InstanceMethodsInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(ENHANCE_METHOD).and(takesArgumentWithType(1, ENHANCE_METHOD_TYPE));
+                }
+
+                @Override
+                public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override
+                public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/SendCallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/SendCallbackInstrumentation.java
new file mode 100644
index 0000000..865015c
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/SendCallbackInstrumentation.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.pulsar.SendCallbackEnhanceRequiredInfo;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * Pulsar producer send callback instrumentation.
+ *
+ * The send callback enhanced object will use {@link org.apache.skywalking.apm.plugin.pulsar.SendCallbackEnhanceRequiredInfo}
+ * which {@link org.apache.skywalking.apm.plugin.pulsar.PulsarProducerInterceptor} set by skywalking dynamic field of
+ * enhanced object.
+ *
+ * When a callback is complete, {@link org.apache.skywalking.apm.plugin.pulsar.SendCallbackInterceptor} will continue
+ * the {@link SendCallbackEnhanceRequiredInfo#getContextSnapshot()}.
+ *
+ * @author penghui
+ */
+public class SendCallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+    public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.SendCallback";
+    public static final String ENHANCE_METHOD = "sendComplete";
+    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.SendCallbackInterceptor";
+
+    @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[0];
+    }
+
+    @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(ENHANCE_METHOD);
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byHierarchyMatch(new String[] {ENHANCE_CLASS});
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
new file mode 100644
index 0000000..a59280e
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.SendCallbackInstrumentation
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerInstrumentation
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarProducerInstrumentation
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
new file mode 100644
index 0000000..371f45d
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConsumerConstructorInterceptorTest {
+
+    private static final String SERVICE_URL = "pulsar://localhost:6650";
+    private static final String TOPIC_NAME = "persistent://my-tenant/my-ns/my-topic";
+    private static final String SUBSCRIPTION_NAME = "my-sub";
+
+    @Mock
+    private PulsarClientImpl pulsarClient;
+
+    @Mock
+    private LookupService lookupService;
+
+    @Mock
+    private ConsumerConfigurationData consumerConfigurationData;
+
+    private ConsumerConstructorInterceptor constructorInterceptor;
+
+    private EnhancedInstance enhancedInstance = new EnhancedInstance() {
+
+        private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
+
+        @Override public Object getSkyWalkingDynamicField() {
+            return consumerEnhanceRequiredInfo;
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+            consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo)value;
+        }
+    };
+
+    @Before
+    public void setUp() {
+        when(lookupService.getServiceUrl()).thenReturn(SERVICE_URL);
+        when(pulsarClient.getLookup()).thenReturn(lookupService);
+        when(consumerConfigurationData.getSubscriptionName()).thenReturn(SUBSCRIPTION_NAME);
+        constructorInterceptor = new ConsumerConstructorInterceptor();
+    }
+
+    @Test
+    public void testOnConsumer() {
+        constructorInterceptor.onConstruct(enhancedInstance, new Object[] {pulsarClient, TOPIC_NAME, consumerConfigurationData});
+        ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
+        assertThat(requiredInfo.getServiceUrl(), is(SERVICE_URL));
+        assertThat(requiredInfo.getTopic(), is(TOPIC_NAME));
+        assertThat(requiredInfo.getSubscriptionName(), is(SUBSCRIPTION_NAME));
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java
new file mode 100644
index 0000000..3162792
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockMessage extends MessageImpl {
+
+    private PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+
+    private transient Map<String, String> properties;
+
+    public MockMessage() {
+        this(null, "1:1", new HashMap(), null, null);
+    }
+
+    public MockMessage(String topic, String msgId, Map properties, ByteBuf payload, Schema schema) {
+        super(topic, msgId, properties, payload, schema);
+    }
+
+    @Override
+    public PulsarApi.MessageMetadata.Builder getMessageBuilder() {
+        return msgMetadataBuilder;
+    }
+
+    public synchronized Map<String, String> getProperties() {
+        if (this.properties == null) {
+            if (this.msgMetadataBuilder.getPropertiesCount() > 0) {
+                Map<String, String> internalProperties = new HashMap<String, String>();
+                for (int i = 0; i < this.msgMetadataBuilder.getPropertiesCount(); i++) {
+                    PulsarApi.KeyValue kv = this.msgMetadataBuilder.getProperties(i);
+                    internalProperties.put(kv.getKey(), kv.getValue());
+                }
+                this.properties = Collections.unmodifiableMap(internalProperties);
+            } else {
+                this.properties = Collections.emptyMap();
+            }
+        }
+        return this.properties;
+    }
+
+    @Override
+    public String getProperty(String name) {
+        return this.getProperties().get(name);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptorTest.java
new file mode 100644
index 0000000..eaf4aa8
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptorTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProducerConstructorInterceptorTest {
+
+    private static final String SERVICE_URL = "pulsar://localhost:6650";
+    private static final String TOPIC_NAME = "persistent://my-tenant/my-ns/my-topic";
+
+    @Mock
+    private PulsarClientImpl pulsarClient;
+
+    @Mock
+    private LookupService lookupService;
+
+    private ProducerConstructorInterceptor constructorInterceptor;
+
+    private EnhancedInstance enhancedInstance = new EnhancedInstance() {
+
+        private ProducerEnhanceRequiredInfo requiredInfo;
+
+        @Override public Object getSkyWalkingDynamicField() {
+            return requiredInfo;
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+            this.requiredInfo = (ProducerEnhanceRequiredInfo)value;
+        }
+    };
+
+    @Before
+    public void setUp() {
+        when(lookupService.getServiceUrl()).thenReturn(SERVICE_URL);
+        when(pulsarClient.getLookup()).thenReturn(lookupService);
+        constructorInterceptor = new ProducerConstructorInterceptor();
+    }
+
+    @Test
+    public void testOnConsumer() {
+        constructorInterceptor.onConstruct(enhancedInstance, new Object[] {pulsarClient, TOPIC_NAME});
+        ProducerEnhanceRequiredInfo requiredInfo = (ProducerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
+        assertThat(requiredInfo.getServiceUrl(), is(SERVICE_URL));
+        assertThat(requiredInfo.getTopic(), is(TOPIC_NAME));
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
new file mode 100644
index 0000000..7629f5c
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.helper.SegmentRefHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.hamcrest.MatcherAssert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import java.util.List;
+
+import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.PULSAR_CONSUMER;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class PulsarConsumerInterceptorTest {
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+
+    private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
+
+    private PulsarConsumerInterceptor consumerInterceptor;
+
+    private MockMessage msg;
+
+    private EnhancedInstance consumerInstance = new EnhancedInstance() {
+        @Override public Object getSkyWalkingDynamicField() {
+            return consumerEnhanceRequiredInfo;
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+            consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo)value;
+        }
+    };
+
+    @Before
+    public void setUp() {
+        Config.Agent.ACTIVE_V1_HEADER = true;
+        consumerInterceptor = new PulsarConsumerInterceptor();
+        consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo();
+
+        consumerEnhanceRequiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
+        consumerEnhanceRequiredInfo.setServiceUrl("pulsar://localhost:6650");
+        consumerEnhanceRequiredInfo.setSubscriptionName("my-sub");
+        msg = new MockMessage();
+        msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder()
+                .setKey("sw3")
+                .setValue("1.234.111|3|1|1|#192.168.1.8:18002|#/portal/|#testEntrySpan|#AQA*#AQA*Et0We0tQNQA*"));
+    }
+
+    @After
+    public void clear() {
+        Config.Agent.ACTIVE_V1_HEADER = false;
+    }
+
+    @Test
+    public void testConsumerWithNullMessage() throws Throwable {
+        consumerInterceptor.beforeMethod(consumerInstance, null, new Object[]{null}, new Class[0], null);
+        consumerInterceptor.afterMethod(consumerInstance, null, new Object[]{null}, new Class[0], null);
+
+        List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
+        assertThat(traceSegments.size(), is(0));
+    }
+
+    @Test
+    public void testConsumerWithMessage() throws Throwable {
+        consumerInterceptor.beforeMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null);
+        consumerInterceptor.afterMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null);
+
+        List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
+        assertThat(traceSegments.size(), is(1));
+
+        TraceSegment traceSegment = traceSegments.get(0);
+        List<TraceSegmentRef> refs = traceSegment.getRefs();
+        assertThat(refs.size(), is(1));
+        assertTraceSegmentRef(refs.get(0));
+
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+        assertThat(spans.size(), is(1));
+        assertConsumerSpan(spans.get(0));
+    }
+
+    private void assertConsumerSpan(AbstractTracingSpan span) {
+        SpanAssert.assertLayer(span, SpanLayer.MQ);
+        SpanAssert.assertComponent(span, PULSAR_CONSUMER);
+        SpanAssert.assertTagSize(span, 2);
+        SpanAssert.assertTag(span, 0, "pulsar://localhost:6650");
+        SpanAssert.assertTag(span, 1, "persistent://my-tenant/my-ns/my-topic");
+    }
+
+    private void assertTraceSegmentRef(TraceSegmentRef ref) {
+        MatcherAssert.assertThat(SegmentRefHelper.getEntryServiceInstanceId(ref), is(1));
+        MatcherAssert.assertThat(SegmentRefHelper.getSpanId(ref), is(3));
+        MatcherAssert.assertThat(SegmentRefHelper.getTraceSegmentId(ref).toString(), is("1.234.111"));
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptorTest.java
new file mode 100644
index 0000000..ecbf72d
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptorTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import java.util.List;
+
+import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.PULSAR_PRODUCER;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class PulsarProducerInterceptorTest {
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+
+
+    private PulsarProducerInterceptor producerInterceptor;
+
+    private Object[] arguments;
+    private Class[] argumentType;
+
+    private EnhancedInstance pulsarProducerInstance = new EnhancedInstance() {
+
+
+        @Override public Object getSkyWalkingDynamicField() {
+            ProducerEnhanceRequiredInfo requiredInfo = new ProducerEnhanceRequiredInfo();
+            requiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
+            requiredInfo.setServiceUrl("pulsar://localhost:6650");
+            return requiredInfo;
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+        }
+    };
+
+    private MessageImpl msg = new MockMessage();
+
+    @Before
+    public void setUp() {
+        producerInterceptor = new PulsarProducerInterceptor();
+        arguments = new Object[] {msg, null};
+        argumentType = new Class[] {MessageImpl.class};
+    }
+
+    @Test
+    public void testSendMessage() throws Throwable {
+        producerInterceptor.beforeMethod(pulsarProducerInstance, null, arguments, argumentType, null);
+        producerInterceptor.afterMethod(pulsarProducerInstance, null, arguments, argumentType, null);
+
+        List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
+        assertThat(traceSegmentList.size(), is(1));
+
+        TraceSegment segment = traceSegmentList.get(0);
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
+        assertThat(spans.size(), is(1));
+
+        assertMessageSpan(spans.get(0));
+    }
+
+    @Test
+    public void testSendWithNullMessage() throws Throwable {
+        producerInterceptor.beforeMethod(pulsarProducerInstance, null, new Object[]{null}, argumentType, null);
+        producerInterceptor.afterMethod(pulsarProducerInstance, null, new Object[]{null}, argumentType, null);
+        List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
+        assertThat(traceSegmentList.size(), is(0));
+    }
+
+    private void assertMessageSpan(AbstractTracingSpan span) {
+        SpanAssert.assertTag(span, 0, "pulsar://localhost:6650");
+        SpanAssert.assertTag(span, 1, "persistent://my-tenant/my-ns/my-topic");
+        SpanAssert.assertComponent(span, PULSAR_PRODUCER);
+        SpanAssert.assertLayer(span, SpanLayer.MQ);
+        assertThat(span.getOperationName(), is("Pulsar/persistent://my-tenant/my-ns/my-topic/Producer"));
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptorTest.java
new file mode 100644
index 0000000..177f959
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/SendCallbackInterceptorTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.skywalking.apm.agent.core.context.MockContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentRefAssert;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import java.util.List;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class SendCallbackInterceptorTest {
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+
+    private SendCallbackInterceptor callbackInterceptor;
+
+    private Object[] arguments;
+    private Object[] argumentsWithException;
+    private Class[] argumentTypes;
+
+    private EnhancedInstance callBackInstance = new EnhancedInstance() {
+
+        @Override public Object getSkyWalkingDynamicField() {
+            SendCallbackEnhanceRequiredInfo requiredInfo = new SendCallbackEnhanceRequiredInfo();
+            requiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
+            requiredInfo.setContextSnapshot(MockContextSnapshot.INSTANCE.mockContextSnapshot());
+            return requiredInfo;
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+
+        }
+    };
+
+    @Before
+    public void setUp() {
+        callbackInterceptor = new SendCallbackInterceptor();
+
+        arguments = new Object[] {
+            null
+        };
+        argumentsWithException = new Object[] {
+            new RuntimeException()
+        };
+
+        argumentTypes = new Class[] {
+            Exception.class
+        };
+    }
+
+    @Test
+    public void testCallbackWithoutException() throws Throwable {
+        callbackInterceptor.beforeMethod(callBackInstance, null, arguments, argumentTypes, null);
+        callbackInterceptor.afterMethod(callBackInstance, null, arguments, argumentTypes, null);
+
+        List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
+        assertThat(traceSegments.size(), is(1));
+        TraceSegment traceSegment = traceSegments.get(0);
+
+        List<AbstractTracingSpan> abstractSpans = SegmentHelper.getSpans(traceSegment);
+        assertThat(abstractSpans.size(), is(1));
+
+        assertCallbackSpan(abstractSpans.get(0));
+
+        assertCallbackSegmentRef(traceSegment.getRefs());
+    }
+
+    @Test
+    public void testCallbackWithException() throws Throwable {
+        callbackInterceptor.beforeMethod(callBackInstance, null, argumentsWithException, argumentTypes, null);
+        callbackInterceptor.afterMethod(callBackInstance, null, argumentsWithException, argumentTypes, null);
+
+        List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
+        assertThat(traceSegments.size(), is(1));
+        TraceSegment traceSegment = traceSegments.get(0);
+
+        List<AbstractTracingSpan> abstractSpans = SegmentHelper.getSpans(traceSegment);
+        assertThat(abstractSpans.size(), is(1));
+
+        assertCallbackSpanWithException(abstractSpans.get(0));
+
+        assertCallbackSegmentRef(traceSegment.getRefs());
+    }
+
+    private void assertCallbackSpanWithException(AbstractTracingSpan span) {
+        assertCallbackSpan(span);
+
+        SpanAssert.assertException(SpanHelper.getLogs(span).get(0), RuntimeException.class);
+        assertThat(SpanHelper.getErrorOccurred(span), is(true));
+    }
+
+    private void assertCallbackSegmentRef(List<TraceSegmentRef> refs) {
+        assertThat(refs.size(), is(1));
+
+        TraceSegmentRef segmentRef = refs.get(0);
+        SegmentRefAssert.assertSpanId(segmentRef, 1);
+        assertThat(segmentRef.getEntryEndpointName(), is("/for-test-entryOperationName"));
+    }
+
+    private void assertCallbackSpan(AbstractTracingSpan span) {
+        assertThat(span.getOperationName(), is("Pulsar/Producer/Callback"));
+    }
+}
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md
index 51ddb54..07db4d8 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -44,6 +44,7 @@
   * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 1.0
   * [ActiveMQ](https://github.com/apache/activemq) 5.x
   * [RabbitMQ](https://www.rabbitmq.com/) 5.x
+  * [Pulsar](http://pulsar.apache.org) 2.2.x -> 2.4.x
 * NoSQL
   * Redis
     * [Jedis](https://github.com/xetorthio/jedis) 2.x
diff --git a/oap-server/server-starter/src/main/resources/component-libraries.yml b/oap-server/server-starter/src/main/resources/component-libraries.yml
index 8846376..778ebd1 100755
--- a/oap-server/server-starter/src/main/resources/component-libraries.yml
+++ b/oap-server/server-starter/src/main/resources/component-libraries.yml
@@ -242,7 +242,15 @@ Cassandra:
 Light4J:
   id: 71
   languages: Java
-
+Pulsar:
+  id: 72
+  languages: Java
+pulsar-producer:
+  id: 73
+  languages: Java
+pulsar-consumer:
+  id: 74
+  languages: Java
 
 # .NET/.NET Core components
 # [3000, 4000) for C#/.NET only
@@ -343,4 +351,6 @@ Component-Server-Mappings:
   Npgsql.EntityFrameworkCore.PostgreSQL: PostgreSQL
   transport-client: Elasticsearch
   SolrJ: Solr
-  cassandra-java-driver: Cassandra
\ No newline at end of file
+  cassandra-java-driver: Cassandra
+  pulsar-producer: Pulsar
+  pulsar-consumer: Pulsar
\ No newline at end of file