You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/09/24 02:21:39 UTC
[skywalking] branch master updated: MOD: Kafka-plugin Compatible
with KafkaTemplate (#3505)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 8de9f7d MOD: Kafka-plugin Compatible with KafkaTemplate (#3505)
8de9f7d is described below
commit 8de9f7d74ce2429391f92afbfc86360ffb529746
Author: Stalary <st...@163.com>
AuthorDate: Tue Sep 24 10:21:33 2019 +0800
MOD: Kafka-plugin Compatible with KafkaTemplate (#3505)
* MOD: Compatible with KafkaTemplate
* MOD: modify test
---
...torMapInterceptor.java => CallbackAdapter.java} | 44 ++++++++++---
...uctorMapInterceptor.java => CallbackCache.java} | 40 +++++++++---
...or.java => CallbackConstructorInterceptor.java} | 23 ++++---
.../apm/plugin/kafka/CallbackInterceptor.java | 31 +++++++---
.../apm/plugin/kafka/KafkaProducerInterceptor.java | 7 ++-
....java => KafkaTemplateCallbackInterceptor.java} | 25 +++++---
.../kafka/ProducerConstructorMapInterceptor.java | 6 +-
.../AbstractKafkaTemplateInstrumentation.java} | 20 +++---
.../KafkaTemplateCallbackInstrumentation.java | 66 ++++++++++++++++++++
.../kafka/define/KafkaTemplateInstrumentation.java | 72 ++++++++++++++++++++++
.../src/main/resources/skywalking-plugin.def | 4 +-
.../apm/plugin/kafka/CallbackInterceptorTest.java | 4 +-
12 files changed, 279 insertions(+), 63 deletions(-)
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackAdapter.java
similarity index 50%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackAdapter.java
index 8695995..f4a8142 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackAdapter.java
@@ -15,22 +15,48 @@
* limitations under the License.
*
*/
-
package org.apache.skywalking.apm.plugin.kafka;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-import org.apache.skywalking.apm.util.StringUtil;
-
-import java.util.Map;
/**
+ * implements Callback and EnhancedInstance, for transformation kafkaTemplate.buildCallback
* @author stalary
*/
-public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
+public class CallbackAdapter implements Callback, EnhancedInstance {
+
+ private Object instance;
+
+ private Callback userCallback;
+
+ public CallbackAdapter(Callback userCallback, Object instance) {
+ this.userCallback = userCallback;
+ this.instance = instance;
+ }
+
+ public CallbackAdapter() {
+ }
+
@Override
- public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
- Map<String, Object> config = (Map<String, Object>) allArguments[0];
- objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (userCallback != null) {
+ userCallback.onCompletion(metadata, exception);
+ }
+ }
+
+ @Override
+ public Object getSkyWalkingDynamicField() {
+ return instance;
+ }
+
+ @Override
+ public void setSkyWalkingDynamicField(Object value) {
+ this.instance = value;
+ }
+
+ public Callback getUserCallback() {
+ return userCallback;
}
}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackCache.java
similarity index 54%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackCache.java
index 8695995..6e48f83 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackCache.java
@@ -15,22 +15,42 @@
* limitations under the License.
*
*/
-
package org.apache.skywalking.apm.plugin.kafka;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-import org.apache.skywalking.apm.util.StringUtil;
-
-import java.util.Map;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
/**
+ * cache Callback and ContextSnapshot
* @author stalary
*/
-public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
+public class CallbackCache {
+
+ private Callback callback;
+
+ private ContextSnapshot snapshot;
+
+ public Callback getCallback() {
+ return callback;
+ }
+
+ public void setCallback(Callback callback) {
+ this.callback = callback;
+ }
+
+ public ContextSnapshot getSnapshot() {
+ return snapshot;
+ }
+
+ public void setSnapshot(ContextSnapshot snapshot) {
+ this.snapshot = snapshot;
+ }
+
@Override
- public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
- Map<String, Object> config = (Map<String, Object>) allArguments[0];
- objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+ public String toString() {
+ return "CallbackCache{" +
+ "callback=" + callback +
+ ", snapshot=" + snapshot +
+ '}';
}
}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackConstructorInterceptor.java
similarity index 68%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackConstructorInterceptor.java
index 8695995..9fb123a 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackConstructorInterceptor.java
@@ -18,19 +18,26 @@
package org.apache.skywalking.apm.plugin.kafka;
+import org.apache.kafka.clients.producer.Callback;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-import org.apache.skywalking.apm.util.StringUtil;
-
-import java.util.Map;
/**
+ * intercept Callback set cache
* @author stalary
- */
-public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
+ **/
+public class CallbackConstructorInterceptor implements InstanceConstructorInterceptor {
+
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
- Map<String, Object> config = (Map<String, Object>) allArguments[0];
- objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+ Callback callback = (Callback) allArguments[0];
+ CallbackCache cache;
+ if (null != objInst.getSkyWalkingDynamicField()) {
+ cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
+ } else {
+ cache = new CallbackCache();
+ }
+ cache.setCallback(callback);
+ objInst.setSkyWalkingDynamicField(cache);
}
-}
\ No newline at end of file
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
index cda65f2..68b41d9 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
@@ -38,27 +38,30 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
- //Get the SnapshotContext
- ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
- if (null != contextSnapshot) {
+ CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
+ if (null != cache) {
+ ContextSnapshot snapshot = getSnapshot(cache);
RecordMetadata metadata = (RecordMetadata) allArguments[0];
AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback");
activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
- ContextManager.continued(contextSnapshot);
+ ContextManager.continued(snapshot);
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
- ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
- if (null != contextSnapshot) {
- Exception exceptions = (Exception) allArguments[1];
- if (exceptions != null) {
- ContextManager.activeSpan().errorOccurred().log(exceptions);
+ CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
+ if (null != cache) {
+ ContextSnapshot snapshot = getSnapshot(cache);
+ if (null != snapshot) {
+ Exception exceptions = (Exception) allArguments[1];
+ if (exceptions != null) {
+ ContextManager.activeSpan().errorOccurred().log(exceptions);
+ }
+ ContextManager.stopSpan();
}
- ContextManager.stopSpan();
}
return ret;
}
@@ -68,4 +71,12 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
+
+ private ContextSnapshot getSnapshot(CallbackCache cache) {
+ ContextSnapshot snapshot = cache.getSnapshot();
+ if (snapshot == null) {
+ snapshot = ((CallbackCache) ((EnhancedInstance) cache.getCallback()).getSkyWalkingDynamicField()).getSnapshot();
+ }
+ return snapshot;
+ }
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
index e6b23a8..5cc5abc 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
@@ -61,12 +61,13 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto
next = next.next();
record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes());
}
-
EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1];
- if (callbackInstance != null) {
+ if (null != callbackInstance) {
ContextSnapshot snapshot = ContextManager.capture();
if (null != snapshot) {
- callbackInstance.setSkyWalkingDynamicField(snapshot);
+ CallbackCache cache = new CallbackCache();
+ cache.setSnapshot(snapshot);
+ callbackInstance.setSkyWalkingDynamicField(cache);
}
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaTemplateCallbackInterceptor.java
similarity index 53%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaTemplateCallbackInterceptor.java
index 8695995..a23b83a 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaTemplateCallbackInterceptor.java
@@ -15,22 +15,31 @@
* limitations under the License.
*
*/
-
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
-import java.util.Map;
+import java.lang.reflect.Method;
/**
+ * transformation kafkaTemplate.buildCallback
* @author stalary
*/
-public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
+public class KafkaTemplateCallbackInterceptor implements InstanceMethodsAroundInterceptor {
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+
+ }
+
@Override
- public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
- Map<String, Object> config = (Map<String, Object>) allArguments[0];
- objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+ return new CallbackAdapter((org.apache.kafka.clients.producer.Callback) ret, objInst);
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+
}
}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
index 8695995..c9c0f54 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
@@ -28,9 +28,13 @@ import java.util.Map;
* @author stalary
*/
public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
+
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
Map<String, Object> config = (Map<String, Object>) allArguments[0];
- objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+ // prevent errors caused by secondary interception in kafkaTemplate
+ if (objInst.getSkyWalkingDynamicField() == null) {
+ objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+ }
}
}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaTemplateInstrumentation.java
similarity index 59%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaTemplateInstrumentation.java
index 8695995..8f797d0 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaTemplateInstrumentation.java
@@ -16,21 +16,17 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka;
+package org.apache.skywalking.apm.plugin.kafka.define;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-import org.apache.skywalking.apm.util.StringUtil;
-
-import java.util.Map;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
/**
+ * Compatible with KafkaTemplate
* @author stalary
*/
-public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
- @Override
- public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
- Map<String, Object> config = (Map<String, Object>) allArguments[0];
- objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
+public abstract class AbstractKafkaTemplateInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+ @Override protected String[] witnessClasses() {
+ return new String[]{"org.springframework.kafka.core.KafkaTemplate"};
}
-}
\ No newline at end of file
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateCallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateCallbackInstrumentation.java
new file mode 100644
index 0000000..e8481dd
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateCallbackInstrumentation.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * InterceptorCallback wrapped CallbackAdapter, need intercept
+ * @author stalary
+ */
+public class KafkaTemplateCallbackInstrumentation extends AbstractKafkaTemplateInstrumentation {
+
+ private static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback";
+ private static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.producer.Callback";
+ private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.CallbackConstructorInterceptor";
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[]{
+ new ConstructorInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription> getConstructorMatcher() {
+ return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
+ }
+
+ @Override
+ public String getConstructorInterceptor() {
+ return CONSTRUCTOR_INTERCEPTOR_CLASS;
+ }
+ }
+ };
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[0];
+ }
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateInstrumentation.java
new file mode 100644
index 0000000..9b480a1
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateInstrumentation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.apm.plugin.kafka.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * intercept kafkaTemplate.buildCallback translation callback
+ *
+ * @author stalary
+ */
+public class KafkaTemplateInstrumentation extends AbstractKafkaTemplateInstrumentation {
+
+ private static final String ENHANCE_CLASS = "org.springframework.kafka.core.KafkaTemplate";
+ private static final String ENHANCE_METHOD = "buildCallback";
+ private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaTemplateCallbackInterceptor";
+
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[]{
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription> getMethodsMatcher() {
+ return named(ENHANCE_METHOD);
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
index 1a2fb5e..64f3d72 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
@@ -17,4 +17,6 @@
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
-kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
\ No newline at end of file
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateInstrumentation
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java
index ac6c206..818a52a 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java
@@ -63,7 +63,9 @@ public class CallbackInterceptorTest {
private EnhancedInstance callBackInstance = new EnhancedInstance() {
@Override public Object getSkyWalkingDynamicField() {
- return MockContextSnapshot.INSTANCE.mockContextSnapshot();
+ CallbackCache cache = new CallbackCache();
+ cache.setSnapshot(MockContextSnapshot.INSTANCE.mockContextSnapshot());
+ return cache;
}
@Override public void setSkyWalkingDynamicField(Object value) {