You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/09/04 01:40:42 UTC
[skywalking] branch master updated: MOD: Modify compatibility of
kafka plugin and expand operationName (#3390)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 05d9c0a MOD: Modify compatibility of kafka plugin and expand operationName (#3390)
05d9c0a is described below
commit 05d9c0a93a1d7e2decdf985d36bffa437f23a922
Author: Stalary <st...@163.com>
AuthorDate: Wed Sep 4 09:40:33 2019 +0800
MOD: Modify compatibility of kafka plugin and expand operationName (#3390)
* ADD: add operationName length threshold
* MOD:move operationName threshold to Config, simplified code
* MOD:update agent set-up document, clean code
* MOD:add agent.operation_name_threshold conf prefix
* MOD: Modify compatibility of kafka plugin and expand operationName
* MOD: Delete localSpan layer, modify consumer operationName, delete producer key
* MOD: Modify operationName, reduce String operation
* FIX: Fix callback break, the new version of kafka callback is intercepted twice and needs to be judged
* FIX: Fix different problems with KafkaProducer constructor in old and new versions
* MOD: separate code
---
.../{kafka-v1-plugin => kafka-plugin}/pom.xml | 2 +-
.../apm/plugin/kafka}/CallbackInterceptor.java | 40 ++++++++++-----
.../kafka}/ConsumerConstructorInterceptor.java | 6 ++-
.../plugin/kafka}/ConsumerEnhanceRequiredInfo.java | 11 +++-
.../plugin/kafka}/KafkaConsumerInterceptor.java | 8 +--
.../plugin/kafka}/KafkaProducerInterceptor.java | 33 +++++++-----
.../kafka}/ProducerConstructorInterceptor.java | 7 +--
.../kafka/ProducerConstructorMapInterceptor.java} | 19 ++++---
.../plugin/kafka}/SubscribeMethodInterceptor.java | 2 +-
.../define/AbstractKafkaInstrumentation.java | 2 +-
.../kafka}/define/CallbackInstrumentation.java | 4 +-
.../define/KafkaConsumerInstrumentation.java | 13 ++---
.../define/KafkaProducerInstrumentation.java | 39 ++++++++------
.../define/KafkaProducerMapInstrumentation.java} | 37 ++++++++-----
.../src/main/resources/skywalking-plugin.def | 8 +--
.../apm/plugin/kafka}/CallbackInterceptorTest.java | 15 ++----
.../kafka}/ConsumerConstructorInterceptorTest.java | 5 +-
.../kafka}/KafkaConsumerInterceptorTest.java | 20 +++-----
.../kafka}/KafkaProducerInterceptorTest.java | 14 ++---
.../kafka}/ProducerConstructorInterceptorTest.java | 8 +--
.../kafka}/SubscribeMethodInterceptorTest.java | 11 ++--
.../ProducerRecordConstructorInterceptorTest.java | 60 ----------------------
apm-sniffer/apm-sdk-plugin/pom.xml | 2 +-
23 files changed, 175 insertions(+), 191 deletions(-)
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml
similarity index 96%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/pom.xml
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml
index 6287dce..0c830f2 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml
@@ -25,7 +25,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>apm-kafka-v1-plugin</artifactId>
+ <artifactId>apm-kafka-plugin</artifactId>
<properties>
<kafka-clients.version>0.11.0.0</kafka-clients.version>
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/CallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
similarity index 58%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/CallbackInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
index 8e38bd9..cda65f2 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/CallbackInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
@@ -16,42 +16,56 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
-import java.lang.reflect.Method;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+import java.lang.reflect.Method;
+/**
+ * @author zhang xin, stalary
+ **/
public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
+
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
- MethodInterceptResult result) throws Throwable {
- AbstractSpan abstractSpan = ContextManager.createLocalSpan("Producer/Callback");
-
+ MethodInterceptResult result) throws Throwable {
//Get the SnapshotContext
- ContextSnapshot contextSnapshot = (ContextSnapshot)objInst.getSkyWalkingDynamicField();
+ ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
if (null != contextSnapshot) {
+ RecordMetadata metadata = (RecordMetadata) allArguments[0];
+ AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback");
+ activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
+ Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
ContextManager.continued(contextSnapshot);
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
- Object ret) throws Throwable {
- Exception exceptions = (Exception)allArguments[1];
- if (exceptions != null) {
- ContextManager.activeSpan().errorOccurred().log(exceptions);
+ Object ret) throws Throwable {
+ ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
+ if (null != contextSnapshot) {
+ Exception exceptions = (Exception) allArguments[1];
+ if (exceptions != null) {
+ ContextManager.activeSpan().errorOccurred().log(exceptions);
+ }
+ ContextManager.stopSpan();
}
- ContextManager.stopSpan();
return ret;
}
- @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
- Class<?>[] argumentsTypes, Throwable t) {
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
similarity index 91%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerConstructorInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
index 77880a7..35124a3 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerConstructorInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
@@ -16,12 +16,15 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+/**
+ * @author zhang xin, stalary
+ **/
public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
@Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
@@ -29,6 +32,7 @@ public class ConsumerConstructorInterceptor implements InstanceConstructorInterc
// set the bootstrap server address
ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
requiredInfo.setBrokerServers(config.getList("bootstrap.servers"));
+ requiredInfo.setGroupId(config.getString("group.id"));
objInst.setSkyWalkingDynamicField(requiredInfo);
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerEnhanceRequiredInfo.java
similarity index 87%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerEnhanceRequiredInfo.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerEnhanceRequiredInfo.java
index 62018be..8855623 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerEnhanceRequiredInfo.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerEnhanceRequiredInfo.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
import java.util.Collection;
import java.util.List;
@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.util.StringUtil;
public class ConsumerEnhanceRequiredInfo {
private String brokerServers;
private String topics;
+ private String groupId;
private long startTime;
public void setBrokerServers(List<String> brokerServers) {
@@ -35,6 +36,10 @@ public class ConsumerEnhanceRequiredInfo {
this.topics = StringUtil.join(';', topics.toArray(new String[0]));
}
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+
public String getBrokerServers() {
return brokerServers;
}
@@ -43,6 +48,10 @@ public class ConsumerEnhanceRequiredInfo {
return topics;
}
+ public String getGroupId() {
+ return groupId;
+ }
+
public void setStartTime(long startTime) {
this.startTime = startTime;
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java
similarity index 94%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaConsumerInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java
index 9c0c5d2..99753c8 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaConsumerInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
import java.lang.reflect.Method;
import java.util.Iterator;
@@ -37,12 +37,12 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInt
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
/**
- * @author zhang xin
+ * @author zhang xin, stalary
*/
public class KafkaConsumerInterceptor implements InstanceMethodsAroundInterceptor {
public static final String OPERATE_NAME_PREFIX = "Kafka/";
- public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
+ public static final String CONSUMER_OPERATE_NAME = "/Consumer/";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
@@ -60,7 +60,7 @@ public class KafkaConsumerInterceptor implements InstanceMethodsAroundIntercepto
//
if (records.size() > 0) {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
- AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME_SUFFIX, null).start(requiredInfo.getStartTime());
+ AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId(), null).start(requiredInfo.getStartTime());
activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER);
SpanLayer.asMQ(activeSpan);
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
similarity index 73%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaProducerInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
index 5880a1a..e6b23a8 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaProducerInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
@@ -16,13 +16,13 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
-import java.lang.reflect.Method;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -31,8 +31,10 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceM
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import java.lang.reflect.Method;
+
/**
- * @author zhang xin
+ * @author zhang xin, stalary
*/
public class KafkaProducerInterceptor implements InstanceMethodsAroundInterceptor {
@@ -41,16 +43,15 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
- MethodInterceptResult result) throws Throwable {
+ MethodInterceptResult result) throws Throwable {
ContextCarrier contextCarrier = new ContextCarrier();
- ProducerRecord record = (ProducerRecord)allArguments[0];
- String topicName = (String)((EnhancedInstance)record).getSkyWalkingDynamicField();
+ ProducerRecord record = (ProducerRecord) allArguments[0];
+ String topicName = record.topic();
+ AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String) objInst.getSkyWalkingDynamicField());
- AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String)objInst.getSkyWalkingDynamicField());
-
- Tags.MQ_BROKER.set(activeSpan, (String)objInst.getSkyWalkingDynamicField());
+ Tags.MQ_BROKER.set(activeSpan, (String) objInst.getSkyWalkingDynamicField());
Tags.MQ_TOPIC.set(activeSpan, topicName);
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
@@ -61,21 +62,25 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto
record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes());
}
- EnhancedInstance callbackInstance = (EnhancedInstance)allArguments[1];
+ EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1];
if (callbackInstance != null) {
- callbackInstance.setSkyWalkingDynamicField(ContextManager.capture());
+ ContextSnapshot snapshot = ContextManager.capture();
+ if (null != snapshot) {
+ callbackInstance.setSkyWalkingDynamicField(snapshot);
+ }
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
- Object ret) throws Throwable {
+ Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
- @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
- Class<?>[] argumentsTypes, Throwable t) {
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptor.java
similarity index 86%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerConstructorInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptor.java
index 604f080..c9c0d51 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerConstructorInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptor.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
@@ -25,8 +25,9 @@ import org.apache.skywalking.apm.util.StringUtil;
public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {
- @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
- ProducerConfig config = (ProducerConfig)allArguments[0];
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+ ProducerConfig config = (ProducerConfig) allArguments[0];
objInst.setSkyWalkingDynamicField(StringUtil.join(';', config.getList("bootstrap.servers").toArray(new String[0])));
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
similarity index 65%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
index 1a2d5e3..8695995 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
@@ -16,14 +16,21 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+import org.apache.skywalking.apm.util.StringUtil;
-public class ProducerRecordConstructorInterceptor implements InstanceConstructorInterceptor {
- @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
- String topic = (String)allArguments[0];
- objInst.setSkyWalkingDynamicField(topic);
+import java.util.Map;
+
+/**
+ * @author stalary
+ */
+public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+ Map<String, Object> config = (Map<String, Object>) allArguments[0];
+ objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
}
-}
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/SubscribeMethodInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
similarity index 97%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/SubscribeMethodInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
index 7c4e655..0453a5e 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/SubscribeMethodInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
import java.lang.reflect.Method;
import java.util.Collection;
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/AbstractKafkaInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaInstrumentation.java
similarity index 95%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/AbstractKafkaInstrumentation.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaInstrumentation.java
index e737441..cf86711 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/AbstractKafkaInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaInstrumentation.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v1.define;
+package org.apache.skywalking.apm.plugin.kafka.define;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/CallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/CallbackInstrumentation.java
similarity index 95%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/CallbackInstrumentation.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/CallbackInstrumentation.java
index 922968c..7bf7b40 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/CallbackInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/CallbackInstrumentation.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v1.define;
+package org.apache.skywalking.apm.plugin.kafka.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
@@ -31,7 +31,7 @@ public class CallbackInstrumentation extends AbstractKafkaInstrumentation {
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.Callback";
public static final String ENHANCE_METHOD = "onCompletion";
- public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.CallbackInterceptor";
+ public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor";
@Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
similarity index 90%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaConsumerInstrumentation.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
index 0aef949..cfc9471 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaConsumerInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v1.define;
+package org.apache.skywalking.apm.plugin.kafka.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
@@ -38,18 +38,19 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName
* 4. Stop the entry span when <code>pollOnce</code> method finished.
* </pre>
*
- * @author zhang xin
+ * @author zhang xin,stalary
*/
public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerConfig";
- public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.ConsumerConstructorInterceptor";
- public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.KafkaConsumerInterceptor";
+ public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConsumerConstructorInterceptor";
+ public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
public static final String ENHANCE_METHOD = "pollOnce";
+ public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
public static final String SUBSCRIBE_METHOD = "subscribe";
public static final String SUBSCRIBE_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener";
- public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.SubscribeMethodInterceptor";
+ public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.SubscribeMethodInterceptor";
@Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
@@ -69,7 +70,7 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
- return named(ENHANCE_METHOD);
+ return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD));
}
@Override public String getMethodsInterceptor() {
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaProducerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerInstrumentation.java
similarity index 71%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaProducerInstrumentation.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerInstrumentation.java
index fa702c8..9fe0d3b 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaProducerInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerInstrumentation.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v1.define;
+package org.apache.skywalking.apm.plugin.kafka.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
@@ -39,48 +39,57 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName
* 3. Stop the exit span when <code>send</code> method finished.
* </pre>
*
- * @author zhang xin
+ * @author zhang xin, stalary
*/
public class KafkaProducerInstrumentation extends AbstractKafkaInstrumentation {
- public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.KafkaProducerInterceptor";
+ public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaProducerInterceptor";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer";
- public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.ProducerConstructorInterceptor";
+ public static final String ENHANCE_METHOD = "doSend";
+ public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ProducerConstructorInterceptor";
public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "org.apache.kafka.clients.producer.ProducerConfig";
- @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
- return new ConstructorInterceptPoint[] {
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
- @Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
+ @Override
+ public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
}
- @Override public String getConstructorInterceptor() {
+ @Override
+ public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
- @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
- return new InstanceMethodsInterceptPoint[] {
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
- @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
- return named("doSend");
+ @Override
+ public ElementMatcher<MethodDescription> getMethodsMatcher() {
+ return named(ENHANCE_METHOD);
}
- @Override public String getMethodsInterceptor() {
+ @Override
+ public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
- @Override public boolean isOverrideArgs() {
+ @Override
+ public boolean isOverrideArgs() {
return false;
}
}
};
}
- @Override protected ClassMatch enhanceClass() {
+ @Override
+ protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerMapInstrumentation.java
similarity index 60%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerMapInstrumentation.java
index e40f14b..8888c80 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerMapInstrumentation.java
@@ -15,8 +15,7 @@
* limitations under the License.
*
*/
-
-package org.apache.skywalking.apm.plugin.kafka.v1.define;
+package org.apache.skywalking.apm.plugin.kafka.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
@@ -24,33 +23,43 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterc
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
-import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
-public class ProducerRecordInstrumentation extends AbstractKafkaInstrumentation {
+/**
+ * after version 2.1.0 use Map to config
+ * @author stalary
+ */
+public class KafkaProducerMapInstrumentation extends AbstractKafkaInstrumentation {
- public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.ProducerRecordConstructorInterceptor";
- public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.ProducerRecord";
+ public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer";
+ public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ProducerConstructorMapInterceptor";
+ public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "java.util.Map";
- @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
- return new ConstructorInterceptPoint[] {
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
- @Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
- return takesArguments(6);
+ @Override
+ public ElementMatcher<MethodDescription> getConstructorMatcher() {
+ return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
}
- @Override public String getConstructorInterceptor() {
+ @Override
+ public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
- @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
- @Override protected ClassMatch enhanceClass() {
+ @Override
+ protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
-}
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
similarity index 67%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/resources/skywalking-plugin.def
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
index 59e564a..1a2fb5e 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/resources/skywalking-plugin.def
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.CallbackInstrumentation
-kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.KafkaConsumerInstrumentation
-kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.KafkaProducerInstrumentation
-kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.ProducerRecordInstrumentation
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstrumentation
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/CallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java
similarity index 89%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/CallbackInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java
index 0629b20..ac6c206 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/CallbackInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java
@@ -16,9 +16,8 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v11;
+package org.apache.skywalking.apm.plugin.kafka;
-import java.util.List;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.skywalking.apm.agent.core.context.MockContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
@@ -27,13 +26,7 @@ import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
-import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
-import org.apache.skywalking.apm.agent.test.tools.SegmentRefAssert;
-import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
-import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
-import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
-import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
-import org.apache.skywalking.apm.plugin.kafka.v1.CallbackInterceptor;
+import org.apache.skywalking.apm.agent.test.tools.*;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -43,6 +36,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import java.util.List;
+
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@@ -143,6 +138,6 @@ public class CallbackInterceptorTest {
}
private void assertCallbackSpan(AbstractTracingSpan span) {
- assertThat(span.getOperationName(), is("Producer/Callback"));
+ assertThat(span.getOperationName(), is("Kafka/Producer/Callback"));
}
}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ConsumerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
similarity index 92%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ConsumerConstructorInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
index ad642dd..dff6400 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ConsumerConstructorInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
@@ -16,17 +16,16 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v11;
+package org.apache.skywalking.apm.plugin.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerConstructorInterceptor;
-import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerEnhanceRequiredInfo;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+
import java.util.ArrayList;
import java.util.List;
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptorTest.java
similarity index 91%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaConsumerInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptorTest.java
index dc8a832..aa3543b 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaConsumerInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptorTest.java
@@ -16,12 +16,8 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v11;
+package org.apache.skywalking.apm.plugin.kafka;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.skywalking.apm.agent.core.conf.Config;
@@ -32,13 +28,7 @@ import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.helper.SegmentRefHelper;
-import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
-import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
-import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
-import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
-import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
-import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerEnhanceRequiredInfo;
-import org.apache.skywalking.apm.plugin.kafka.v1.KafkaConsumerInterceptor;
+import org.apache.skywalking.apm.agent.test.tools.*;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Before;
@@ -48,6 +38,11 @@ import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.KAFKA_CONSUMER;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -92,6 +87,7 @@ public class KafkaConsumerInterceptorTest {
brokers.add("localhost:9092");
brokers.add("localhost:19092");
consumerEnhanceRequiredInfo.setBrokerServers(brokers);
+ consumerEnhanceRequiredInfo.setGroupId("test");
messages = new HashMap<TopicPartition, List<ConsumerRecord>>();
TopicPartition topicPartition = new TopicPartition("test", 1);
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaProducerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptorTest.java
similarity index 88%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaProducerInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptorTest.java
index 0ca6fd7..4dbde02 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaProducerInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptorTest.java
@@ -16,21 +16,15 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v11;
+package org.apache.skywalking.apm.plugin.kafka;
-import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
-import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
-import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
-import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
-import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
-import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
-import org.apache.skywalking.apm.plugin.kafka.v1.KafkaProducerInterceptor;
+import org.apache.skywalking.apm.agent.test.tools.*;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -38,6 +32,8 @@ import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import java.util.List;
+
import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.KAFKA_PRODUCER;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -71,7 +67,7 @@ public class KafkaProducerInterceptorTest {
private class MockProducerMessage extends ProducerRecord implements EnhancedInstance {
public MockProducerMessage() {
- super("test", "");
+ super("test", "key1", "");
}
@Override public Object getSkyWalkingDynamicField() {
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptorTest.java
similarity index 94%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerConstructorInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptorTest.java
index 6eee953..cdc3b02 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerConstructorInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptorTest.java
@@ -16,19 +16,19 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v11;
+package org.apache.skywalking.apm.plugin.kafka;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.plugin.kafka.v1.ProducerConstructorInterceptor;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import java.util.ArrayList;
+import java.util.List;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.when;
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/SubscribeMethodInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java
similarity index 92%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/SubscribeMethodInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java
index 66a0842..a4336d9 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/SubscribeMethodInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java
@@ -16,20 +16,19 @@
*
*/
-package org.apache.skywalking.apm.plugin.kafka.v11;
+package org.apache.skywalking.apm.plugin.kafka;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerEnhanceRequiredInfo;
-import org.apache.skywalking.apm.plugin.kafka.v1.SubscribeMethodInterceptor;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java
deleted file mode 100644
index 5a1a9d6..0000000
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.plugin.kafka.v11;
-
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.plugin.kafka.v1.ProducerRecordConstructorInterceptor;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-
-@RunWith(MockitoJUnitRunner.class)
-public class ProducerRecordConstructorInterceptorTest {
-
- @Mock
- private ProducerRecordConstructorInterceptor constructorInterceptor;
-
- private EnhancedInstance enhancedInstance = new EnhancedInstance() {
- private String brokerServers;
-
- @Override public Object getSkyWalkingDynamicField() {
- return brokerServers;
- }
-
- @Override public void setSkyWalkingDynamicField(Object value) {
- brokerServers = (String)value;
- }
- };
-
- @Before
- public void setUp() {
- constructorInterceptor = new ProducerRecordConstructorInterceptor();
- }
-
- @Test
- public void testOnConsumer() {
- constructorInterceptor.onConstruct(enhancedInstance, new Object[] {"test"});
- assertThat(enhancedInstance.getSkyWalkingDynamicField().toString(), is("test"));
- }
-}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml
index 96509d5..f5669cd 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -59,7 +59,7 @@
<module>elastic-job-2.x-plugin</module>
<module>mongodb-2.x-plugin</module>
<module>httpasyncclient-4.x-plugin</module>
- <module>kafka-v1-plugin</module>
+ <module>kafka-plugin</module>
<module>servicecomb-plugin</module>
<module>hystrix-1.x-plugin</module>
<module>sofarpc-plugin</module>