You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/09/24 02:21:39 UTC

[skywalking] branch master updated: MOD: Kafka-plugin Compatible with KafkaTemplate (#3505)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 8de9f7d  MOD: Kafka-plugin Compatible with KafkaTemplate (#3505)
8de9f7d is described below

commit 8de9f7d74ce2429391f92afbfc86360ffb529746
Author: Stalary <st...@163.com>
AuthorDate: Tue Sep 24 10:21:33 2019 +0800

    MOD: Kafka-plugin Compatible with KafkaTemplate (#3505)
    
    * MOD: Compatible with KafkaTemplate
    
    * MOD: modify test
---
 ...torMapInterceptor.java => CallbackAdapter.java} | 44 ++++++++++---
 ...uctorMapInterceptor.java => CallbackCache.java} | 40 +++++++++---
 ...or.java => CallbackConstructorInterceptor.java} | 23 ++++---
 .../apm/plugin/kafka/CallbackInterceptor.java      | 31 +++++++---
 .../apm/plugin/kafka/KafkaProducerInterceptor.java |  7 ++-
 ....java => KafkaTemplateCallbackInterceptor.java} | 25 +++++---
 .../kafka/ProducerConstructorMapInterceptor.java   |  6 +-
 .../AbstractKafkaTemplateInstrumentation.java}     | 20 +++---
 .../KafkaTemplateCallbackInstrumentation.java      | 66 ++++++++++++++++++++
 .../kafka/define/KafkaTemplateInstrumentation.java | 72 ++++++++++++++++++++++
 .../src/main/resources/skywalking-plugin.def       |  4 +-
 .../apm/plugin/kafka/CallbackInterceptorTest.java  |  4 +-
 12 files changed, 279 insertions(+), 63 deletions(-)

diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackAdapter.java
similarity index 50%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackAdapter.java
index 8695995..f4a8142 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackAdapter.java
@@ -15,22 +15,48 @@
  * limitations under the License.
  *
  */
-
 package org.apache.skywalking.apm.plugin.kafka;
 
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-import org.apache.skywalking.apm.util.StringUtil;
-
-import java.util.Map;
 
 /**
+ * implements Callback and EnhancedInstance, for transformation kafkaTemplate.buildCallback
  * @author stalary
  */
-public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
+public class CallbackAdapter implements Callback, EnhancedInstance {
+
+    private Object instance;
+
+    private Callback userCallback;
+
+    public CallbackAdapter(Callback userCallback, Object instance) {
+        this.userCallback = userCallback;
+        this.instance = instance;
+    }
+
+    public CallbackAdapter() {
+    }
+
     @Override
-    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
-        Map<String, Object> config = (Map<String, Object>) allArguments[0];
-        objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+    public void onCompletion(RecordMetadata metadata, Exception exception) {
+        if (userCallback != null) {
+            userCallback.onCompletion(metadata, exception);
+        }
+    }
+
+    @Override
+    public Object getSkyWalkingDynamicField() {
+        return instance;
+    }
+
+    @Override
+    public void setSkyWalkingDynamicField(Object value) {
+        this.instance = value;
+    }
+
+    public Callback getUserCallback() {
+        return userCallback;
     }
 }
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackCache.java
similarity index 54%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackCache.java
index 8695995..6e48f83 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackCache.java
@@ -15,22 +15,42 @@
  * limitations under the License.
  *
  */
-
 package org.apache.skywalking.apm.plugin.kafka;
 
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-import org.apache.skywalking.apm.util.StringUtil;
-
-import java.util.Map;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
 
 /**
+ * cache Callback and ContextSnapshot
  * @author stalary
  */
-public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
+public class CallbackCache {
+
+    private Callback callback;
+
+    private ContextSnapshot snapshot;
+
+    public Callback getCallback() {
+        return callback;
+    }
+
+    public void setCallback(Callback callback) {
+        this.callback = callback;
+    }
+
+    public ContextSnapshot getSnapshot() {
+        return snapshot;
+    }
+
+    public void setSnapshot(ContextSnapshot snapshot) {
+        this.snapshot = snapshot;
+    }
+
     @Override
-    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
-        Map<String, Object> config = (Map<String, Object>) allArguments[0];
-        objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+    public String toString() {
+        return "CallbackCache{" +
+            "callback=" + callback +
+            ", snapshot=" + snapshot +
+            '}';
     }
 }
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackConstructorInterceptor.java
similarity index 68%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackConstructorInterceptor.java
index 8695995..9fb123a 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackConstructorInterceptor.java
@@ -18,19 +18,26 @@
 
 package org.apache.skywalking.apm.plugin.kafka;
 
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-import org.apache.skywalking.apm.util.StringUtil;
-
-import java.util.Map;
 
 /**
+ * intercept Callback set cache
  * @author stalary
- */
-public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
+ **/
+public class CallbackConstructorInterceptor implements InstanceConstructorInterceptor {
+
     @Override
     public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
-        Map<String, Object> config = (Map<String, Object>) allArguments[0];
-        objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+        Callback callback = (Callback) allArguments[0];
+        CallbackCache cache;
+        if (null != objInst.getSkyWalkingDynamicField()) {
+            cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
+        } else {
+            cache = new CallbackCache();
+        }
+        cache.setCallback(callback);
+        objInst.setSkyWalkingDynamicField(cache);
     }
-}
\ No newline at end of file
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
index cda65f2..68b41d9 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
@@ -38,27 +38,30 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
     @Override
     public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              MethodInterceptResult result) throws Throwable {
-        //Get the SnapshotContext
-        ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
-        if (null != contextSnapshot) {
+        CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
+        if (null != cache) {
+            ContextSnapshot snapshot = getSnapshot(cache);
             RecordMetadata metadata = (RecordMetadata) allArguments[0];
             AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback");
             activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
             Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
-            ContextManager.continued(contextSnapshot);
+            ContextManager.continued(snapshot);
         }
     }
 
     @Override
     public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                               Object ret) throws Throwable {
-        ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
-        if (null != contextSnapshot) {
-            Exception exceptions = (Exception) allArguments[1];
-            if (exceptions != null) {
-                ContextManager.activeSpan().errorOccurred().log(exceptions);
+        CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
+        if (null != cache) {
+            ContextSnapshot snapshot = getSnapshot(cache);
+            if (null != snapshot) {
+                Exception exceptions = (Exception) allArguments[1];
+                if (exceptions != null) {
+                    ContextManager.activeSpan().errorOccurred().log(exceptions);
+                }
+                ContextManager.stopSpan();
             }
-            ContextManager.stopSpan();
         }
         return ret;
     }
@@ -68,4 +71,12 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
                                       Class<?>[] argumentsTypes, Throwable t) {
         ContextManager.activeSpan().errorOccurred().log(t);
     }
+
+    private ContextSnapshot getSnapshot(CallbackCache cache) {
+        ContextSnapshot snapshot = cache.getSnapshot();
+        if (snapshot == null) {
+            snapshot = ((CallbackCache) ((EnhancedInstance) cache.getCallback()).getSkyWalkingDynamicField()).getSnapshot();
+        }
+        return snapshot;
+    }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
index e6b23a8..5cc5abc 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
@@ -61,12 +61,13 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto
             next = next.next();
             record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes());
         }
-
         EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1];
-        if (callbackInstance != null) {
+        if (null != callbackInstance) {
             ContextSnapshot snapshot = ContextManager.capture();
             if (null != snapshot) {
-                callbackInstance.setSkyWalkingDynamicField(snapshot);
+                CallbackCache cache = new CallbackCache();
+                cache.setSnapshot(snapshot);
+                callbackInstance.setSkyWalkingDynamicField(cache);
             }
         }
     }
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaTemplateCallbackInterceptor.java
similarity index 53%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaTemplateCallbackInterceptor.java
index 8695995..a23b83a 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaTemplateCallbackInterceptor.java
@@ -15,22 +15,31 @@
  * limitations under the License.
  *
  */
-
 package org.apache.skywalking.apm.plugin.kafka;
 
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
 
-import java.util.Map;
+import java.lang.reflect.Method;
 
 /**
+ * transformation kafkaTemplate.buildCallback
  * @author stalary
  */
-public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
+public class KafkaTemplateCallbackInterceptor implements InstanceMethodsAroundInterceptor {
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+
+    }
+
     @Override
-    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
-        Map<String, Object> config = (Map<String, Object>) allArguments[0];
-        objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+        return new CallbackAdapter((org.apache.kafka.clients.producer.Callback) ret, objInst);
+    }
+
+    @Override
+    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+
     }
 }
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
index 8695995..c9c0f54 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
@@ -28,9 +28,13 @@ import java.util.Map;
  * @author stalary
  */
 public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
+
     @Override
     public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
         Map<String, Object> config = (Map<String, Object>) allArguments[0];
-        objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+        // prevent errors caused by secondary interception in kafkaTemplate
+        if (objInst.getSkyWalkingDynamicField() == null) {
+            objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+        }
     }
 }
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaTemplateInstrumentation.java
similarity index 59%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaTemplateInstrumentation.java
index 8695995..8f797d0 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaTemplateInstrumentation.java
@@ -16,21 +16,17 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka;
+package org.apache.skywalking.apm.plugin.kafka.define;
 
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-import org.apache.skywalking.apm.util.StringUtil;
-
-import java.util.Map;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
 
 /**
+ * Compatible with KafkaTemplate
  * @author stalary
  */
-public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
-    @Override
-    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
-        Map<String, Object> config = (Map<String, Object>) allArguments[0];
-        objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+public abstract class AbstractKafkaTemplateInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+    @Override protected String[] witnessClasses() {
+        return new String[]{"org.springframework.kafka.core.KafkaTemplate"};
     }
-}
\ No newline at end of file
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateCallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateCallbackInstrumentation.java
new file mode 100644
index 0000000..e8481dd
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateCallbackInstrumentation.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * InterceptorCallback wrapped CallbackAdapter, need intercept
+ * @author stalary
+ */
+public class KafkaTemplateCallbackInstrumentation extends AbstractKafkaTemplateInstrumentation {
+
+    private static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback";
+    private static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.producer.Callback";
+    private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.CallbackConstructorInterceptor";
+
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[]{
+            new ConstructorInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> getConstructorMatcher() {
+                    return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
+                }
+
+                @Override
+                public String getConstructorInterceptor() {
+                    return CONSTRUCTOR_INTERCEPTOR_CLASS;
+                }
+            }
+        };
+    }
+
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[0];
+    }
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateInstrumentation.java
new file mode 100644
index 0000000..9b480a1
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateInstrumentation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.apm.plugin.kafka.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * intercept kafkaTemplate.buildCallback translation callback
+ *
+ * @author stalary
+ */
+public class KafkaTemplateInstrumentation extends AbstractKafkaTemplateInstrumentation {
+
+    private static final String ENHANCE_CLASS = "org.springframework.kafka.core.KafkaTemplate";
+    private static final String ENHANCE_METHOD = "buildCallback";
+    private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaTemplateCallbackInterceptor";
+
+
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[0];
+    }
+
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[]{
+            new InstanceMethodsInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(ENHANCE_METHOD);
+                }
+
+                @Override
+                public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override
+                public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
index 1a2fb5e..64f3d72 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
@@ -17,4 +17,6 @@
 kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstrumentation
 kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation
 kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
-kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
\ No newline at end of file
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateInstrumentation
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java
index ac6c206..818a52a 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java
@@ -63,7 +63,9 @@ public class CallbackInterceptorTest {
 
     private EnhancedInstance callBackInstance = new EnhancedInstance() {
         @Override public Object getSkyWalkingDynamicField() {
-            return MockContextSnapshot.INSTANCE.mockContextSnapshot();
+            CallbackCache cache = new CallbackCache();
+            cache.setSnapshot(MockContextSnapshot.INSTANCE.mockContextSnapshot());
+            return cache;
         }
 
         @Override public void setSkyWalkingDynamicField(Object value) {