You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/04/26 03:17:08 UTC

[skywalking] branch master updated: feat: add enhance pulsar MessageListener instance (#6774)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 9de3724  feat: add enhance pulsar MessageListener instance (#6774)
9de3724 is described below

commit 9de37243c1d22f941a565067431c53eba326a5e4
Author: ZhangZhaoyuan <do...@gmail.com>
AuthorDate: Mon Apr 26 11:16:43 2021 +0800

    feat: add enhance pulsar MessageListener instance (#6774)
---
 CHANGES.md                                         |   1 +
 .../pulsar/ConsumerConstructorInterceptor.java     |   1 +
 .../plugin/pulsar/ConsumerEnhanceRequiredInfo.java |  13 ++
 .../pulsar/MessageConstructorInterceptor.java      |  39 +++++
 ...edInfo.java => MessageEnhanceRequiredInfo.java} |  32 ++--
 .../plugin/pulsar/PulsarConsumerInterceptor.java   |  16 +-
 ...java => PulsarConsumerListenerInterceptor.java} |  69 ++++----
 .../pulsar/define/MessageInstrumentation.java      |  71 ++++++++
 .../PulsarConsumerListenerInstrumentation.java     |  76 ++++++++
 .../src/main/resources/skywalking-plugin.def       |   4 +-
 .../pulsar/ConsumerConstructorInterceptorTest.java |   9 +
 .../skywalking/apm/plugin/pulsar/MockConsumer.java | 181 +++++++++++++++++++
 .../skywalking/apm/plugin/pulsar/MockMessage.java  |  15 +-
 .../pulsar/PulsarConsumerInterceptorTest.java      |  33 +++-
 ... => PulsarConsumerListenerInterceptorTest.java} |  94 +++++++---
 .../pulsar-scenario/config/expectedData.yaml       | 193 +++++++++++++--------
 .../testcase/pulsar/controller/CaseController.java |  47 +++--
 17 files changed, 717 insertions(+), 177 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 282e97a..57ce2ba 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -17,6 +17,7 @@ Release Notes.
 * Add Seata in the component definition. Seata plugin hosts on Seata project.
 * Extended Kafka plugin to properly trace consumers that have topic partitions directly assigned.
 * Support print SkyWalking context to logs.
+* Add `MessageListener` enhancement in pulsar plugin
 
 #### OAP-Backend
 * BugFix: filter invalid Envoy access logs whose socket address is empty.
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
index 173753b..d7070f7 100644
--- a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
@@ -45,6 +45,7 @@ public class ConsumerConstructorInterceptor implements InstanceConstructorInterc
         requireInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
         requireInfo.setTopic(topic);
         requireInfo.setSubscriptionName(consumerConfigurationData.getSubscriptionName());
+        requireInfo.setHasMessageListener(consumerConfigurationData.getMessageListener() != null);
         objInst.setSkyWalkingDynamicField(requireInfo);
     }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
index 25eec64..27c3861 100644
--- a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
@@ -38,6 +38,19 @@ public class ConsumerEnhanceRequiredInfo {
      */
     private String subscriptionName;
 
+    /**
+     * whether the consumer has a message listener
+     */
+    private boolean hasMessageListener;
+
+    public boolean isHasMessageListener() {
+        return hasMessageListener;
+    }
+
+    public void setHasMessageListener(boolean hasMessageListener) {
+        this.hasMessageListener = hasMessageListener;
+    }
+
     public String getServiceUrl() {
         return serviceUrl;
     }
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/MessageConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/MessageConstructorInterceptor.java
new file mode 100644
index 0000000..f7781ac
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/MessageConstructorInterceptor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+/**
+ * Interceptor of pulsar message constructor.
+ * <p>
+ * The interceptor create {@link MessageEnhanceRequiredInfo} which is required by passing message span across
+ * threads. Another purpose of this interceptor is to make {@link ClassEnhancePluginDefine} enable enhanced class
+ * to implement {@link EnhancedInstance} interface. Because if {@link ClassEnhancePluginDefine} class will not create
+ * SkyWalkingDynamicField without any constructor or method interceptor.
+ */
+public class MessageConstructorInterceptor implements InstanceConstructorInterceptor {
+
+    @Override
+    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+        objInst.setSkyWalkingDynamicField(new MessageEnhanceRequiredInfo());
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/MessageEnhanceRequiredInfo.java
similarity index 62%
copy from apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
copy to apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/MessageEnhanceRequiredInfo.java
index 25eec64..f047a3d 100644
--- a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/MessageEnhanceRequiredInfo.java
@@ -18,15 +18,12 @@
 
 package org.apache.skywalking.apm.plugin.pulsar;
 
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+
 /**
- * Pulsar consumer enhance required info is required by consumer enhanced object method interceptor
+ * Pulsar message enhance required info is required by consumer enhanced object method interceptor
  */
-public class ConsumerEnhanceRequiredInfo {
-
-    /**
-     * service url of the consumer
-     */
-    private String serviceUrl;
+public class MessageEnhanceRequiredInfo {
 
     /**
      * topic name of the consumer
@@ -34,16 +31,16 @@ public class ConsumerEnhanceRequiredInfo {
     private String topic;
 
     /**
-     * subscription name of the consumer
+     * receiving message span snapshot
      */
-    private String subscriptionName;
+    private ContextSnapshot contextSnapshot;
 
-    public String getServiceUrl() {
-        return serviceUrl;
+    public ContextSnapshot getContextSnapshot() {
+        return contextSnapshot;
     }
 
-    public void setServiceUrl(String serviceUrl) {
-        this.serviceUrl = serviceUrl;
+    public void setContextSnapshot(ContextSnapshot contextSnapshot) {
+        this.contextSnapshot = contextSnapshot;
     }
 
     public String getTopic() {
@@ -53,13 +50,4 @@ public class ConsumerEnhanceRequiredInfo {
     public void setTopic(String topic) {
         this.topic = topic;
     }
-
-    public String getSubscriptionName() {
-        return subscriptionName;
-    }
-
-    public void setSubscriptionName(String subscriptionName) {
-        this.subscriptionName = subscriptionName;
-    }
-
 }
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
index 414296f..4c4650d 100644
--- a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
@@ -41,7 +41,8 @@ import java.lang.reflect.Method;
  *  1. Get the @{@link ConsumerEnhanceRequiredInfo} and record the service url, topic name and subscription name
  *  2. Create the entry span when call <code>messageProcessed</code> method
  *  3. Extract all the <code>Trace Context</code> when call <code>messageProcessed</code> method
- *  4. Stop the entry span when <code>messageProcessed</code> method finished.
+ *  4. Capture trace context and set into SkyWalkingDynamic field if consumer has a message listener when <code>messageProcessed</code> method finished
+ *  5. Stop the entry span.
  * </pre>
  */
 public class PulsarConsumerInterceptor implements InstanceMethodsAroundInterceptor {
@@ -74,6 +75,19 @@ public class PulsarConsumerInterceptor implements InstanceMethodsAroundIntercept
     public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
         Object ret) throws Throwable {
         if (allArguments[0] != null) {
+            final ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst
+                    .getSkyWalkingDynamicField();
+            if (requiredInfo.isHasMessageListener()) {
+                EnhancedInstance msg = (EnhancedInstance) allArguments[0];
+                MessageEnhanceRequiredInfo messageEnhanceRequiredInfo = (MessageEnhanceRequiredInfo) msg
+                        .getSkyWalkingDynamicField();
+                if (messageEnhanceRequiredInfo == null) {
+                    messageEnhanceRequiredInfo = new MessageEnhanceRequiredInfo();
+                    msg.setSkyWalkingDynamicField(messageEnhanceRequiredInfo);
+                }
+                messageEnhanceRequiredInfo.setTopic(requiredInfo.getTopic());
+                messageEnhanceRequiredInfo.setContextSnapshot(ContextManager.capture());
+            }
             ContextManager.stopSpan();
         }
         return ret;
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
similarity index 52%
copy from apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
index 414296f..99da061 100644
--- a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
@@ -18,9 +18,7 @@
 
 package org.apache.skywalking.apm.plugin.pulsar;
 
-import org.apache.pulsar.client.api.Message;
-import org.apache.skywalking.apm.agent.core.context.CarrierItem;
-import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.pulsar.client.api.MessageListener;
 import org.apache.skywalking.apm.agent.core.context.ContextManager;
 import org.apache.skywalking.apm.agent.core.context.tag.Tags;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
@@ -33,57 +31,56 @@ import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
 import java.lang.reflect.Method;
 
 /**
- * Interceptor for pulsar consumer enhanced instance
+ * Interceptor for getting pulsar consumer message listener enhanced instance
  * <p>
  * Here is the intercept process steps:
  *
  * <pre>
- *  1. Get the @{@link ConsumerEnhanceRequiredInfo} and record the service url, topic name and subscription name
- *  2. Create the entry span when call <code>messageProcessed</code> method
- *  3. Extract all the <code>Trace Context</code> when call <code>messageProcessed</code> method
- *  4. Stop the entry span when <code>messageProcessed</code> method finished.
+ *  1. Return null if {@link org.apache.pulsar.client.impl.conf.ConsumerConfigurationData} has no message listener
+ *  2. Return a new lambda expression wrap original message listener
+ *  3. New lambda will create local span that continued message reception span
+ *  4. Stop the local span when original message listener <code>received</code> method finished.
  * </pre>
  */
-public class PulsarConsumerInterceptor implements InstanceMethodsAroundInterceptor {
+public class PulsarConsumerListenerInterceptor implements InstanceMethodsAroundInterceptor {
 
     public static final String OPERATE_NAME_PREFIX = "Pulsar/";
-    public static final String CONSUMER_OPERATE_NAME = "/Consumer/";
+    public static final String CONSUMER_OPERATE_NAME = "/Consumer/MessageListener";
 
     @Override
     public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
-        MethodInterceptResult result) throws Throwable {
-        if (allArguments[0] != null) {
-            ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
-            Message msg = (Message) allArguments[0];
-            ContextCarrier carrier = new ContextCarrier();
-            CarrierItem next = carrier.items();
-            while (next.hasNext()) {
-                next = next.next();
-                next.setHeadValue(msg.getProperty(next.getHeadKey()));
-            }
-            AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopic() + CONSUMER_OPERATE_NAME + requiredInfo
-                .getSubscriptionName(), carrier);
-            activeSpan.setComponent(ComponentsDefine.PULSAR_CONSUMER);
-            SpanLayer.asMQ(activeSpan);
-            Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl());
-            Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopic());
-        }
+            MethodInterceptResult result) throws Throwable {
     }
 
     @Override
     public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
-        Object ret) throws Throwable {
-        if (allArguments[0] != null) {
-            ContextManager.stopSpan();
-        }
-        return ret;
+            Object ret) throws Throwable {
+        return ret == null ? null : (MessageListener) (consumer, message) -> {
+            final MessageEnhanceRequiredInfo requiredInfo = (MessageEnhanceRequiredInfo) ((EnhancedInstance) message)
+                    .getSkyWalkingDynamicField();
+            if (requiredInfo == null) {
+                ((MessageListener) ret).received(consumer, message);
+            } else {
+                AbstractSpan activeSpan = ContextManager
+                        .createLocalSpan(OPERATE_NAME_PREFIX + requiredInfo.getTopic() + CONSUMER_OPERATE_NAME);
+                activeSpan.setComponent(ComponentsDefine.PULSAR_CONSUMER);
+                SpanLayer.asMQ(activeSpan);
+                Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopic());
+                ContextManager.continued(requiredInfo.getContextSnapshot());
+
+                try {
+                    ((MessageListener) ret).received(consumer, message);
+                } catch (Exception e) {
+                    ContextManager.activeSpan().log(e);
+                } finally {
+                    ContextManager.stopSpan();
+                }
+            }
+        };
     }
 
     @Override
     public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
-        Class<?>[] argumentsTypes, Throwable t) {
-        if (allArguments[0] != null) {
-            ContextManager.activeSpan().log(t);
-        }
+            Class<?>[] argumentsTypes, Throwable t) {
     }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
new file mode 100644
index 0000000..6b61ae7
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.BooleanMatcher;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * Pulsar message instrumentation.
+ * <p>
+ * The message enhanced object is only for passing message reception span across threads.
+ * <p>
+ * Enhanced message object will be injected {@link org.apache.skywalking.apm.plugin.pulsar.MessageEnhanceRequiredInfo}
+ * after message process method if consumer has a message listener.
+ * </p>
+ */
+public class MessageInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+    public static final String ENHANCE_CLASS = "org.apache.pulsar.client.api.Message";
+    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.MessageConstructorInterceptor";
+
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[]{
+                new ConstructorInterceptPoint() {
+                    @Override
+                    public ElementMatcher<MethodDescription> getConstructorMatcher() {
+                        return new BooleanMatcher<>(true);
+                    }
+
+                    @Override
+                    public String getConstructorInterceptor() {
+                        return CONSTRUCTOR_INTERCEPTOR_CLASS;
+                    }
+                }
+        };
+    }
+
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[0];
+    }
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byHierarchyMatch(ENHANCE_CLASS);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerListenerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerListenerInstrumentation.java
new file mode 100644
index 0000000..e501161
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerListenerInstrumentation.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * The pulsar consumer listener instrumentation use {@link org.apache.pulsar.client.api.MessageListener} as an enhanced
+ * class.
+ * <p>
+ * User will implement {@link org.apache.pulsar.client.api.MessageListener} interface to consume message and enhance
+ * all instances of {@link org.apache.pulsar.client.api.MessageListener} interface can let users get trace information
+ * in message listener thread.
+ */
+public class PulsarConsumerListenerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.PulsarConsumerListenerInterceptor";
+    public static final String ENHANCE_METHOD = "getMessageListener";
+    public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.conf.ConsumerConfigurationData";
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[0];
+    }
+
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[]{
+                new InstanceMethodsInterceptPoint() {
+                    @Override
+                    public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                        return named(ENHANCE_METHOD);
+                    }
+
+                    @Override
+                    public String getMethodsInterceptor() {
+                        return INTERCEPTOR_CLASS;
+                    }
+
+                    @Override
+                    public boolean isOverrideArgs() {
+                        return false;
+                    }
+                }
+        };
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
index a59280e..e2384cb 100644
--- a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
@@ -16,4 +16,6 @@
 
 pulsar=org.apache.skywalking.apm.plugin.pulsar.define.SendCallbackInstrumentation
 pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerInstrumentation
-pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarProducerInstrumentation
\ No newline at end of file
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerListenerInstrumentation
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarProducerInstrumentation
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.MessageInstrumentation
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
index 38a1259..cf7e9a8 100644
--- a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.apm.plugin.pulsar;
 
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -70,6 +71,13 @@ public class ConsumerConstructorInterceptorTest {
         when(lookupService.getServiceUrl()).thenReturn(SERVICE_URL);
         when(pulsarClient.getLookup()).thenReturn(lookupService);
         when(consumerConfigurationData.getSubscriptionName()).thenReturn(SUBSCRIPTION_NAME);
+        when(consumerConfigurationData.getMessageListener()).thenReturn((consumer, message) -> {
+            try {
+                consumer.acknowledge(message);
+            } catch (PulsarClientException e) {
+                e.printStackTrace();
+            }
+        });
         constructorInterceptor = new ConsumerConstructorInterceptor();
     }
 
@@ -84,5 +92,6 @@ public class ConsumerConstructorInterceptorTest {
         assertThat(requiredInfo.getServiceUrl(), is(SERVICE_URL));
         assertThat(requiredInfo.getTopic(), is(TOPIC_NAME));
         assertThat(requiredInfo.getSubscriptionName(), is(SUBSCRIPTION_NAME));
+        assertThat(requiredInfo.isHasMessageListener(), is(true));
     }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockConsumer.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockConsumer.java
new file mode 100644
index 0000000..a56c12b
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockConsumer.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class MockConsumer implements Consumer {
+
+    @Override
+    public String getTopic() {
+        return null;
+    }
+
+    @Override
+    public String getSubscription() {
+        return null;
+    }
+
+    @Override
+    public void unsubscribe() throws PulsarClientException {
+
+    }
+
+    @Override
+    public CompletableFuture<Void> unsubscribeAsync() {
+        return null;
+    }
+
+    @Override
+    public Message receive() throws PulsarClientException {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Message> receiveAsync() {
+        return null;
+    }
+
+    @Override
+    public Message receive(int i, TimeUnit timeUnit) throws PulsarClientException {
+        return null;
+    }
+
+    @Override
+    public void acknowledge(MessageId messageId) throws PulsarClientException {
+
+    }
+
+    @Override
+    public void negativeAcknowledge(MessageId messageId) {
+
+    }
+
+    @Override
+    public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
+
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId) {
+        return null;
+    }
+
+    @Override
+    public ConsumerStats getStats() {
+        return null;
+    }
+
+    @Override
+    public void close() throws PulsarClientException {
+
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return null;
+    }
+
+    @Override
+    public boolean hasReachedEndOfTopic() {
+        return false;
+    }
+
+    @Override
+    public void redeliverUnacknowledgedMessages() {
+
+    }
+
+    @Override
+    public void seek(MessageId messageId) throws PulsarClientException {
+
+    }
+
+    @Override
+    public void seek(long l) throws PulsarClientException {
+
+    }
+
+    @Override
+    public CompletableFuture<Void> seekAsync(MessageId messageId) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> seekAsync(long l) {
+        return null;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return false;
+    }
+
+    @Override
+    public String getConsumerName() {
+        return null;
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message message) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> acknowledgeAsync(Message message) {
+        return null;
+    }
+
+    @Override
+    public void acknowledgeCumulative(Message message) throws PulsarClientException {
+
+    }
+
+    @Override
+    public void negativeAcknowledge(Message message) {
+
+    }
+
+    @Override
+    public void acknowledge(Message message) throws PulsarClientException {
+
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java
index 3162792..f70e5a3 100644
--- a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java
@@ -22,17 +22,20 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-public class MockMessage extends MessageImpl {
+public class MockMessage extends MessageImpl implements EnhancedInstance {
 
     private PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
 
     private transient Map<String, String> properties;
 
+    private Object enhancedSkyWalkingField;
+
     public MockMessage() {
         this(null, "1:1", new HashMap(), null, null);
     }
@@ -66,4 +69,14 @@ public class MockMessage extends MessageImpl {
     public String getProperty(String name) {
         return this.getProperties().get(name);
     }
+
+    @Override
+    public Object getSkyWalkingDynamicField() {
+        return enhancedSkyWalkingField;
+    }
+
+    @Override
+    public void setSkyWalkingDynamicField(Object value) {
+        this.enhancedSkyWalkingField = value;
+    }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
index c17aab0..882984f 100644
--- a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
@@ -59,6 +59,8 @@ public class PulsarConsumerInterceptorTest {
 
     private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
 
+    private MessageEnhanceRequiredInfo messageEnhanceRequiredInfo;
+
     private PulsarConsumerInterceptor consumerInterceptor;
 
     private MockMessage msg;
@@ -85,9 +87,11 @@ public class PulsarConsumerInterceptorTest {
         consumerEnhanceRequiredInfo.setSubscriptionName("my-sub");
         msg = new MockMessage();
         msg.getMessageBuilder()
-           .addProperties(PulsarApi.KeyValue.newBuilder()
-                                            .setKey(SW8CarrierItem.HEADER_NAME)
-                                            .setValue("1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA="));
+                .addProperties(PulsarApi.KeyValue.newBuilder()
+                        .setKey(SW8CarrierItem.HEADER_NAME)
+                        .setValue("1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA="));
+        messageEnhanceRequiredInfo = new MessageEnhanceRequiredInfo();
+        msg.setSkyWalkingDynamicField(messageEnhanceRequiredInfo);
     }
 
     @Test
@@ -116,6 +120,29 @@ public class PulsarConsumerInterceptorTest {
         assertConsumerSpan(spans.get(0));
     }
 
+    @Test
+    public void testConsumerWithMessageListener() throws Throwable {
+        consumerEnhanceRequiredInfo.setHasMessageListener(true);
+        consumerInterceptor.beforeMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null);
+        consumerInterceptor.afterMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null);
+
+        List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
+        assertThat(traceSegments.size(), is(1));
+
+        TraceSegment traceSegment = traceSegments.get(0);
+        assertNotNull(traceSegment.getRef());
+        assertTraceSegmentRef(traceSegment.getRef());
+
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+        assertThat(spans.size(), is(1));
+        assertConsumerSpan(spans.get(0));
+
+        final MessageEnhanceRequiredInfo requiredInfo = (MessageEnhanceRequiredInfo) msg.getSkyWalkingDynamicField();
+        assertThat(requiredInfo.getTopic(), is(
+                ((ConsumerEnhanceRequiredInfo) consumerInstance.getSkyWalkingDynamicField()).getTopic()));
+        assertNotNull(requiredInfo.getContextSnapshot());
+    }
+
     private void assertConsumerSpan(AbstractTracingSpan span) {
         SpanAssert.assertLayer(span, SpanLayer.MQ);
         SpanAssert.assertComponent(span, PULSAR_CONSUMER);
diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
similarity index 56%
copy from apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
copy to apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
index c17aab0..3d86892 100644
--- a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
@@ -18,12 +18,14 @@
 
 package org.apache.skywalking.apm.plugin.pulsar;
 
+import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.skywalking.apm.agent.core.context.SW8CarrierItem;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
 import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
 import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
 import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef.SegmentRefType;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
 import org.apache.skywalking.apm.agent.test.helper.SegmentRefHelper;
@@ -45,25 +47,32 @@ import java.util.List;
 import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.PULSAR_CONSUMER;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 @RunWith(PowerMockRunner.class)
 @PowerMockRunnerDelegate(TracingSegmentRunner.class)
-public class PulsarConsumerInterceptorTest {
-
-    @SegmentStoragePoint
-    private SegmentStorage segmentStorage;
+public class PulsarConsumerListenerInterceptorTest {
 
     @Rule
     public AgentServiceRule serviceRule = new AgentServiceRule();
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+    private final EnhancedInstance consumerConfigurationDataInstance = new EnhancedInstance() {
+        @Override
+        public Object getSkyWalkingDynamicField() {
+            return null;
+        }
 
-    private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
-
-    private PulsarConsumerInterceptor consumerInterceptor;
-
+        @Override
+        public void setSkyWalkingDynamicField(Object value) {
+        }
+    };
+    private PulsarConsumerListenerInterceptor consumerListenerInterceptor;
     private MockMessage msg;
-
-    private EnhancedInstance consumerInstance = new EnhancedInstance() {
+    private PulsarConsumerInterceptor consumerInterceptor;
+    private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
+    private final EnhancedInstance consumerInstance = new EnhancedInstance() {
         @Override
         public Object getSkyWalkingDynamicField() {
             return consumerEnhanceRequiredInfo;
@@ -74,40 +83,72 @@ public class PulsarConsumerInterceptorTest {
             consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo) value;
         }
     };
+    private MockConsumer consumer;
+    private MessageListener messageListener;
 
     @Before
     public void setUp() {
         consumerInterceptor = new PulsarConsumerInterceptor();
+        consumerListenerInterceptor = new PulsarConsumerListenerInterceptor();
+        messageListener = (consumer, message) -> message.getTopicName();
         consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo();
 
         consumerEnhanceRequiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
         consumerEnhanceRequiredInfo.setServiceUrl("pulsar://localhost:6650");
         consumerEnhanceRequiredInfo.setSubscriptionName("my-sub");
+        consumerEnhanceRequiredInfo.setHasMessageListener(true);
         msg = new MockMessage();
         msg.getMessageBuilder()
-           .addProperties(PulsarApi.KeyValue.newBuilder()
-                                            .setKey(SW8CarrierItem.HEADER_NAME)
-                                            .setValue("1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA="));
+                .addProperties(PulsarApi.KeyValue.newBuilder()
+                        .setKey(SW8CarrierItem.HEADER_NAME)
+                        .setValue("1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA="));
+        msg.setSkyWalkingDynamicField(new MessageEnhanceRequiredInfo());
+        consumer = new MockConsumer();
+    }
+
+    @Test
+    public void testWithNoMessageListener() throws Throwable {
+        consumerListenerInterceptor
+                .beforeMethod(consumerConfigurationDataInstance, null, new Object[0], new Class[0], null);
+        final MessageListener messageListener = (MessageListener) consumerListenerInterceptor
+                .afterMethod(consumerConfigurationDataInstance, null, new Object[0], new Class[0], null);
+
+        List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
+        assertThat(traceSegments.size(), is(0));
+        assertNull(messageListener);
     }
 
     @Test
-    public void testConsumerWithNullMessage() throws Throwable {
-        consumerInterceptor.beforeMethod(consumerInstance, null, new Object[] {null}, new Class[0], null);
-        consumerInterceptor.afterMethod(consumerInstance, null, new Object[] {null}, new Class[0], null);
+    public void testWithMessageListenerHasNoRequiredInfo() throws Throwable {
+        consumerListenerInterceptor
+                .beforeMethod(consumerConfigurationDataInstance, null, new Object[0], new Class[0], null);
+        final MessageListener enhancedMessageListener = (MessageListener) consumerListenerInterceptor
+                .afterMethod(consumerConfigurationDataInstance, null, new Object[0], new Class[0],
+                        this.messageListener);
+        assertNotNull(enhancedMessageListener);
+        msg.setSkyWalkingDynamicField(null);
+        enhancedMessageListener.received(consumer, msg);
 
         List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
         assertThat(traceSegments.size(), is(0));
     }
 
     @Test
-    public void testConsumerWithMessage() throws Throwable {
-        consumerInterceptor.beforeMethod(consumerInstance, null, new Object[] {msg}, new Class[0], null);
-        consumerInterceptor.afterMethod(consumerInstance, null, new Object[] {msg}, new Class[0], null);
+    public void testWithMessageListenerHasRequiredInfo() throws Throwable {
+        consumerInterceptor.beforeMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null);
+        consumerInterceptor.afterMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null);
+        consumerListenerInterceptor
+                .beforeMethod(consumerConfigurationDataInstance, null, new Object[0], new Class[0], null);
+        final MessageListener enhancedMessageListener = (MessageListener) consumerListenerInterceptor
+                .afterMethod(consumerConfigurationDataInstance, null, new Object[0], new Class[0],
+                        this.messageListener);
+        assertNotNull(enhancedMessageListener);
+        enhancedMessageListener.received(consumer, msg);
 
         List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
-        assertThat(traceSegments.size(), is(1));
+        assertThat(traceSegments.size(), is(2));
 
-        TraceSegment traceSegment = traceSegments.get(0);
+        TraceSegment traceSegment = traceSegments.get(1);
         assertNotNull(traceSegment.getRef());
         assertTraceSegmentRef(traceSegment.getRef());
 
@@ -119,14 +160,13 @@ public class PulsarConsumerInterceptorTest {
     private void assertConsumerSpan(AbstractTracingSpan span) {
         SpanAssert.assertLayer(span, SpanLayer.MQ);
         SpanAssert.assertComponent(span, PULSAR_CONSUMER);
-        SpanAssert.assertTagSize(span, 2);
-        SpanAssert.assertTag(span, 0, "pulsar://localhost:6650");
-        SpanAssert.assertTag(span, 1, "persistent://my-tenant/my-ns/my-topic");
     }
 
     private void assertTraceSegmentRef(TraceSegmentRef ref) {
-        MatcherAssert.assertThat(SegmentRefHelper.getParentServiceInstance(ref), is("instance"));
-        MatcherAssert.assertThat(SegmentRefHelper.getSpanId(ref), is(3));
-        MatcherAssert.assertThat(SegmentRefHelper.getTraceSegmentId(ref).toString(), is("3.4.5"));
+        MatcherAssert.assertThat(ref.getParentEndpoint(),
+                is("Pulsar/persistent://my-tenant/my-ns/my-topic/Consumer/my-sub"));
+        MatcherAssert.assertThat(SegmentRefHelper.getSpanId(ref), is(0));
+        MatcherAssert.assertThat(SegmentRefHelper.getTraceSegmentId(ref), is("3.4.5"));
+        MatcherAssert.assertThat(ref.getType(), is(SegmentRefType.CROSS_THREAD));
     }
 }
diff --git a/test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml b/test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml
index 1833bb2..a40eb9f 100644
--- a/test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml
@@ -15,79 +15,122 @@
 # limitations under the License.
 segmentItems:
 - serviceName: pulsar-scenario
-  segmentSize: ge 3
+  segmentSize: ge 6
   segments:
-  - segmentId: not null
-    spans:
-    - operationName: Pulsar/test/Producer
-      operationId: 0
-      parentSpanId: 0
-      spanId: 1
-      spanLayer: MQ
-      startTime: nq 0
-      endTime: nq 0
-      componentId: 73
-      isError: false
-      spanType: Exit
-      peer: not null
-      tags:
-      - {key: mq.broker, value: not null}
-      - {key: mq.topic, value: test}
-      skipAnalysis: 'false'
-    - operationName: /case/pulsar-case
-      operationId: 0
-      parentSpanId: -1
-      spanId: 0
-      spanLayer: Http
-      startTime: nq 0
-      endTime: nq 0
-      componentId: 14
-      isError: false
-      spanType: Entry
-      peer: ''
-      tags:
-      - {key: url, value: 'http://localhost:8080/pulsar-scenario/case/pulsar-case'}
-      - {key: http.method, value: GET}
-      skipAnalysis: 'false'
-  - segmentId: not null
-    spans:
-    - operationName: Pulsar/Producer/Callback
-      operationId: 0
-      parentSpanId: -1
-      spanId: 0
-      spanLayer: MQ
-      startTime: nq 0
-      endTime: nq 0
-      componentId: 73
-      isError: false
-      spanType: Local
-      peer: ''
-      tags:
-      - {key: mq.topic, value: test}
-      refs:
-      - {parentEndpoint: /case/pulsar-case, networkAddress: '', refType: CrossThread,
-        parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not
-          null, parentService: pulsar-scenario, traceId: not null}
-      skipAnalysis: 'false'
-  - segmentId: not null
-    spans:
-    - operationName: Pulsar/test/Consumer/test
-      operationId: 0
-      parentSpanId: -1
-      spanId: 0
-      spanLayer: MQ
-      startTime: nq 0
-      endTime: nq 0
-      componentId: 74
-      isError: false
-      spanType: Entry
-      peer: ''
-      tags:
-        - {key: transmission.latency, value: not null}
-        - {key: mq.broker, value: not null}
-        - {key: mq.topic, value: test}
-      refs:
-      - {parentEndpoint: /case/pulsar-case, networkAddress: not null, refType: CrossProcess,
-        parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not
-          null, parentService: pulsar-scenario, traceId: not null}
-      skipAnalysis: 'false'
+    - segmentId: not null
+      spans:
+        - operationName: Pulsar/test/Producer
+          operationId: 0
+          parentSpanId: 0
+          spanId: 1
+          spanLayer: MQ
+          startTime: nq 0
+          endTime: nq 0
+          componentId: 73
+          isError: false
+          spanType: Exit
+          peer: not null
+          tags:
+            - { key: mq.broker, value: not null }
+            - { key: mq.topic, value: test }
+          skipAnalysis: 'false'
+        - operationName: /case/pulsar-case
+          operationId: 0
+          parentSpanId: -1
+          spanId: 0
+          spanLayer: Http
+          startTime: nq 0
+          endTime: nq 0
+          componentId: 14
+          isError: false
+          spanType: Entry
+          peer: ''
+          tags:
+            - { key: url, value: 'http://localhost:8080/pulsar-scenario/case/pulsar-case' }
+            - { key: http.method, value: GET }
+          skipAnalysis: 'false'
+    - segmentId: not null
+      spans:
+        - operationName: Pulsar/Producer/Callback
+          operationId: 0
+          parentSpanId: -1
+          spanId: 0
+          spanLayer: MQ
+          startTime: nq 0
+          endTime: nq 0
+          componentId: 73
+          isError: false
+          spanType: Local
+          peer: ''
+          tags:
+            - { key: mq.topic, value: test }
+          refs:
+            - { parentEndpoint: /case/pulsar-case, networkAddress: '', refType: CrossThread,
+              parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not
+                                                                 null, parentService: pulsar-scenario, traceId: not null }
+          skipAnalysis: 'false'
+    - segmentId: not null
+      spans:
+        - operationName: Pulsar/test/Consumer/test
+          operationId: 0
+          parentSpanId: -1
+          spanId: 0
+          spanLayer: MQ
+          startTime: nq 0
+          endTime: nq 0
+          componentId: 74
+          isError: false
+          spanType: Entry
+          peer: ''
+          tags:
+            - { key: transmission.latency, value: not null }
+            - { key: mq.broker, value: not null }
+            - { key: mq.topic, value: test }
+          refs:
+            - { parentEndpoint: /case/pulsar-case, networkAddress: not null, refType: CrossProcess,
+              parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not
+                                                                 null, parentService: pulsar-scenario, traceId: not null }
+          skipAnalysis: 'false'
+    - segmentId: not null
+      spans:
+        - operationName: Pulsar/test/Consumer/testWithListener
+          operationId: 0
+          parentSpanId: -1
+          spanId: 0
+          spanLayer: MQ
+          startTime: nq 0
+          endTime: nq 0
+          componentId: 74
+          isError: false
+          spanType: Entry
+          peer: ''
+          skipAnalysis: 'false'
+          tags:
+            - { key: transmission.latency, value: not null }
+            - { key: mq.broker, value: not null }
+            - { key: mq.topic, value: test }
+          refs:
+            - { parentEndpoint: /case/pulsar-case, networkAddress: not null,
+                refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,
+                parentServiceInstance: not null, parentService: pulsar-scenario,
+                traceId: not null }
+    - segmentId: not null
+      spans:
+        - operationName: Pulsar/test/Consumer/MessageListener
+          operationId: 0
+          parentSpanId: -1
+          spanId: 0
+          spanLayer: MQ
+          startTime: nq 0
+          endTime: nq 0
+          componentId: 74
+          isError: false
+          spanType: Local
+          peer: ''
+          tags:
+            - { key: mq.topic, value: test }
+          refs:
+            - { parentEndpoint: Pulsar/test/Consumer/testWithListener, networkAddress: '', refType: CrossThread,
+                parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not null,
+                parentService: pulsar-scenario, traceId: not null }
+          skipAnalysis: 'false'
diff --git a/test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java b/test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
index f25eb05..f2e62ff 100644
--- a/test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
+++ b/test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
@@ -18,8 +18,6 @@
 
 package test.apache.skywalking.apm.testcase.pulsar.controller;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.pulsar.client.api.Consumer;
@@ -33,6 +31,9 @@ import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 @Controller
 @RequestMapping("/case")
 @PropertySource("classpath:application.properties")
@@ -51,15 +52,38 @@ public class CaseController {
 
         String topic = "test";
 
+        CountDownLatch latch = new CountDownLatch(2);
+
         PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PULSAR_DOMAIN + serviceUrl).build();
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe();
 
-        producer.newMessage().key("testKey").value(Integer.toString(1).getBytes()).property("TEST", "TEST").send();
+        pulsarClient.newConsumer().topic(topic)
+                .subscriptionName("testWithListener")
+                .messageListener((c, msg) -> {
+                    try {
+                        if (msg != null) {
+                            String propertiesFormat = "key = %s, value = %s";
+                            StringBuilder builder = new StringBuilder();
+                            msg.getProperties()
+                                    .forEach((k, v) -> builder.append(String.format(propertiesFormat, k, v))
+                                            .append(", "));
+                            LOGGER.info("Received message with messageId = {}, key = {}, value = {}, properties = {}",
+                                    msg.getMessageId(), msg
+                                            .getKey(), new String(msg.getValue()), builder.toString());
+
+                        }
+                        c.acknowledge(msg);
+                    } catch (Exception e) {
+                        LOGGER.error("Receive message error", e);
+                    } finally {
+                        latch.countDown();
+                    }
+                }).subscribe();
 
-        CountDownLatch latch = new CountDownLatch(1);
+        producer.newMessage().key("testKey").value(Integer.toString(1).getBytes()).property("TEST", "TEST").send();
 
         Thread t = new Thread(() -> {
             try {
@@ -68,9 +92,10 @@ public class CaseController {
                     String propertiesFormat = "key = %s, value = %s";
                     StringBuilder builder = new StringBuilder();
                     msg.getProperties()
-                       .forEach((k, v) -> builder.append(String.format(propertiesFormat, k, v)).append(", "));
-                    LOGGER.info("Received message with messageId = {}, key = {}, value = {}, properties = {}", msg.getMessageId(), msg
-                        .getKey(), new String(msg.getValue()), builder.toString());
+                            .forEach((k, v) -> builder.append(String.format(propertiesFormat, k, v)).append(", "));
+                    LOGGER.info("Received message with messageId = {}, key = {}, value = {}, properties = {}",
+                            msg.getMessageId(), msg
+                                    .getKey(), new String(msg.getValue()), builder.toString());
 
                 }
                 consumer.acknowledge(msg);
@@ -101,10 +126,10 @@ public class CaseController {
     @ResponseBody
     public String healthCheck() throws InterruptedException {
         try (PulsarClient pulsarClient = PulsarClient.builder()
-                                                     .serviceUrl(PULSAR_DOMAIN + serviceUrl)
-                                                     .build(); Producer<byte[]> producer = pulsarClient.newProducer()
-                                                                                                       .topic("healthCheck")
-                                                                                                       .create()) {
+                .serviceUrl(PULSAR_DOMAIN + serviceUrl)
+                .build(); Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic("healthCheck")
+                .create()) {
             if (producer.isConnected()) {
                 return "Success";
             } else {