You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/09/04 01:40:42 UTC

[skywalking] branch master updated: MOD: Modify compatibility of kafka plugin and expand operationName (#3390)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 05d9c0a  MOD: Modify compatibility of kafka plugin and expand operationName (#3390)
05d9c0a is described below

commit 05d9c0a93a1d7e2decdf985d36bffa437f23a922
Author: Stalary <st...@163.com>
AuthorDate: Wed Sep 4 09:40:33 2019 +0800

    MOD: Modify compatibility of kafka plugin and expand operationName (#3390)
    
    * ADD: add operationName length threshold
    
    * MOD:move operationName threshold to Config, simplified code
    
    * MOD:update agent set-up document, clean code
    
    * MOD:add agent.operation_name_threshold conf prefix
    
    * MOD: Modify compatibility of kafka plugin and expand operationName
    
    * MOD: Delete localSpan layer, modify consumer operationName, delete producer key
    
    * MOD: Modify operationName, reduce String operation
    
    * FIX: Fix callback break, the new version of kafka callback is intercepted twice and needs to be judged
    
    * FIX: Fix different problems with KafkaProducer constructor in old and new versions
    
    * MOD: separate code
---
 .../{kafka-v1-plugin => kafka-plugin}/pom.xml      |  2 +-
 .../apm/plugin/kafka}/CallbackInterceptor.java     | 40 ++++++++++-----
 .../kafka}/ConsumerConstructorInterceptor.java     |  6 ++-
 .../plugin/kafka}/ConsumerEnhanceRequiredInfo.java | 11 +++-
 .../plugin/kafka}/KafkaConsumerInterceptor.java    |  8 +--
 .../plugin/kafka}/KafkaProducerInterceptor.java    | 33 +++++++-----
 .../kafka}/ProducerConstructorInterceptor.java     |  7 +--
 .../kafka/ProducerConstructorMapInterceptor.java}  | 19 ++++---
 .../plugin/kafka}/SubscribeMethodInterceptor.java  |  2 +-
 .../define/AbstractKafkaInstrumentation.java       |  2 +-
 .../kafka}/define/CallbackInstrumentation.java     |  4 +-
 .../define/KafkaConsumerInstrumentation.java       | 13 ++---
 .../define/KafkaProducerInstrumentation.java       | 39 ++++++++------
 .../define/KafkaProducerMapInstrumentation.java}   | 37 ++++++++-----
 .../src/main/resources/skywalking-plugin.def       |  8 +--
 .../apm/plugin/kafka}/CallbackInterceptorTest.java | 15 ++----
 .../kafka}/ConsumerConstructorInterceptorTest.java |  5 +-
 .../kafka}/KafkaConsumerInterceptorTest.java       | 20 +++-----
 .../kafka}/KafkaProducerInterceptorTest.java       | 14 ++---
 .../kafka}/ProducerConstructorInterceptorTest.java |  8 +--
 .../kafka}/SubscribeMethodInterceptorTest.java     | 11 ++--
 .../ProducerRecordConstructorInterceptorTest.java  | 60 ----------------------
 apm-sniffer/apm-sdk-plugin/pom.xml                 |  2 +-
 23 files changed, 175 insertions(+), 191 deletions(-)

diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml
similarity index 96%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/pom.xml
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml
index 6287dce..0c830f2 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml
@@ -25,7 +25,7 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>apm-kafka-v1-plugin</artifactId>
+    <artifactId>apm-kafka-plugin</artifactId>
 
     <properties>
         <kafka-clients.version>0.11.0.0</kafka-clients.version>
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/CallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
similarity index 58%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/CallbackInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
index 8e38bd9..cda65f2 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/CallbackInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
@@ -16,42 +16,56 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
 
-import java.lang.reflect.Method;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.skywalking.apm.agent.core.context.ContextManager;
 import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+import java.lang.reflect.Method;
 
+/**
+ * @author zhang xin, stalary
+ **/
 public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
+
     @Override
     public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
-        MethodInterceptResult result) throws Throwable {
-        AbstractSpan abstractSpan = ContextManager.createLocalSpan("Producer/Callback");
-
+                             MethodInterceptResult result) throws Throwable {
         //Get the SnapshotContext
-        ContextSnapshot contextSnapshot = (ContextSnapshot)objInst.getSkyWalkingDynamicField();
+        ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
         if (null != contextSnapshot) {
+            RecordMetadata metadata = (RecordMetadata) allArguments[0];
+            AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback");
+            activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
+            Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
             ContextManager.continued(contextSnapshot);
         }
     }
 
     @Override
     public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
-        Object ret) throws Throwable {
-        Exception exceptions = (Exception)allArguments[1];
-        if (exceptions != null) {
-            ContextManager.activeSpan().errorOccurred().log(exceptions);
+                              Object ret) throws Throwable {
+        ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
+        if (null != contextSnapshot) {
+            Exception exceptions = (Exception) allArguments[1];
+            if (exceptions != null) {
+                ContextManager.activeSpan().errorOccurred().log(exceptions);
+            }
+            ContextManager.stopSpan();
         }
-        ContextManager.stopSpan();
         return ret;
     }
 
-    @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
-        Class<?>[] argumentsTypes, Throwable t) {
+    @Override
+    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+                                      Class<?>[] argumentsTypes, Throwable t) {
         ContextManager.activeSpan().errorOccurred().log(t);
     }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
similarity index 91%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerConstructorInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
index 77880a7..35124a3 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerConstructorInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
@@ -16,12 +16,15 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
 
+/**
+ * @author zhang xin, stalary
+ **/
 public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
 
     @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
@@ -29,6 +32,7 @@ public class ConsumerConstructorInterceptor implements InstanceConstructorInterc
         // set the bootstrap server address
         ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
         requiredInfo.setBrokerServers(config.getList("bootstrap.servers"));
+        requiredInfo.setGroupId(config.getString("group.id"));
         objInst.setSkyWalkingDynamicField(requiredInfo);
     }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerEnhanceRequiredInfo.java
similarity index 87%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerEnhanceRequiredInfo.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerEnhanceRequiredInfo.java
index 62018be..8855623 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerEnhanceRequiredInfo.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerEnhanceRequiredInfo.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
 
 import java.util.Collection;
 import java.util.List;
@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.util.StringUtil;
 public class ConsumerEnhanceRequiredInfo {
     private String brokerServers;
     private String topics;
+    private String groupId;
     private long startTime;
 
     public void setBrokerServers(List<String> brokerServers) {
@@ -35,6 +36,10 @@ public class ConsumerEnhanceRequiredInfo {
         this.topics = StringUtil.join(';', topics.toArray(new String[0]));
     }
 
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
     public String getBrokerServers() {
         return brokerServers;
     }
@@ -43,6 +48,10 @@ public class ConsumerEnhanceRequiredInfo {
         return topics;
     }
 
+    public String getGroupId() {
+        return groupId;
+    }
+
     public void setStartTime(long startTime) {
         this.startTime = startTime;
     }
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java
similarity index 94%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaConsumerInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java
index 9c0c5d2..99753c8 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaConsumerInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
 
 import java.lang.reflect.Method;
 import java.util.Iterator;
@@ -37,12 +37,12 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInt
 import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
 
 /**
- * @author  zhang xin
+ * @author  zhang xin, stalary
  */
 public class KafkaConsumerInterceptor implements InstanceMethodsAroundInterceptor {
 
     public static final String OPERATE_NAME_PREFIX = "Kafka/";
-    public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
+    public static final String CONSUMER_OPERATE_NAME = "/Consumer/";
 
     @Override
     public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
@@ -60,7 +60,7 @@ public class KafkaConsumerInterceptor implements InstanceMethodsAroundIntercepto
         //
         if (records.size() > 0) {
             ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
-            AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME_SUFFIX, null).start(requiredInfo.getStartTime());
+            AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId(), null).start(requiredInfo.getStartTime());
 
             activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER);
             SpanLayer.asMQ(activeSpan);
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
similarity index 73%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaProducerInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
index 5880a1a..e6b23a8 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaProducerInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java
@@ -16,13 +16,13 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
 
-import java.lang.reflect.Method;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.skywalking.apm.agent.core.context.CarrierItem;
 import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
 import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
 import org.apache.skywalking.apm.agent.core.context.tag.Tags;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
 import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -31,8 +31,10 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceM
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
 import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
 
+import java.lang.reflect.Method;
+
 /**
- * @author zhang xin
+ * @author zhang xin, stalary
  */
 public class KafkaProducerInterceptor implements InstanceMethodsAroundInterceptor {
 
@@ -41,16 +43,15 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto
 
     @Override
     public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
-        MethodInterceptResult result) throws Throwable {
+                             MethodInterceptResult result) throws Throwable {
 
         ContextCarrier contextCarrier = new ContextCarrier();
 
-        ProducerRecord record = (ProducerRecord)allArguments[0];
-        String topicName = (String)((EnhancedInstance)record).getSkyWalkingDynamicField();
+        ProducerRecord record = (ProducerRecord) allArguments[0];
+        String topicName = record.topic();
+        AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String) objInst.getSkyWalkingDynamicField());
 
-        AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String)objInst.getSkyWalkingDynamicField());
-
-        Tags.MQ_BROKER.set(activeSpan, (String)objInst.getSkyWalkingDynamicField());
+        Tags.MQ_BROKER.set(activeSpan, (String) objInst.getSkyWalkingDynamicField());
         Tags.MQ_TOPIC.set(activeSpan, topicName);
         SpanLayer.asMQ(activeSpan);
         activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
@@ -61,21 +62,25 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto
             record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes());
         }
 
-        EnhancedInstance callbackInstance = (EnhancedInstance)allArguments[1];
+        EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1];
         if (callbackInstance != null) {
-            callbackInstance.setSkyWalkingDynamicField(ContextManager.capture());
+            ContextSnapshot snapshot = ContextManager.capture();
+            if (null != snapshot) {
+                callbackInstance.setSkyWalkingDynamicField(snapshot);
+            }
         }
     }
 
     @Override
     public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
-        Object ret) throws Throwable {
+                              Object ret) throws Throwable {
         ContextManager.stopSpan();
         return ret;
     }
 
-    @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
-        Class<?>[] argumentsTypes, Throwable t) {
+    @Override
+    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+                                      Class<?>[] argumentsTypes, Throwable t) {
 
     }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptor.java
similarity index 86%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerConstructorInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptor.java
index 604f080..c9c0d51 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerConstructorInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptor.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
@@ -25,8 +25,9 @@ import org.apache.skywalking.apm.util.StringUtil;
 
 public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {
 
-    @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
-        ProducerConfig config = (ProducerConfig)allArguments[0];
+    @Override
+    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+        ProducerConfig config = (ProducerConfig) allArguments[0];
         objInst.setSkyWalkingDynamicField(StringUtil.join(';', config.getList("bootstrap.servers").toArray(new String[0])));
     }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
similarity index 65%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
index 1a2d5e3..8695995 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java
@@ -16,14 +16,21 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
 
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+import org.apache.skywalking.apm.util.StringUtil;
 
-public class ProducerRecordConstructorInterceptor implements InstanceConstructorInterceptor {
-    @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
-        String topic = (String)allArguments[0];
-        objInst.setSkyWalkingDynamicField(topic);
+import java.util.Map;
+
+/**
+ * @author stalary
+ */
+public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
+    @Override
+    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+        Map<String, Object> config = (Map<String, Object>) allArguments[0];
+        objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
     }
-}
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/SubscribeMethodInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
similarity index 97%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/SubscribeMethodInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
index 7c4e655..0453a5e 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/SubscribeMethodInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v1;
+package org.apache.skywalking.apm.plugin.kafka;
 
 import java.lang.reflect.Method;
 import java.util.Collection;
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/AbstractKafkaInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaInstrumentation.java
similarity index 95%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/AbstractKafkaInstrumentation.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaInstrumentation.java
index e737441..cf86711 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/AbstractKafkaInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaInstrumentation.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v1.define;
+package org.apache.skywalking.apm.plugin.kafka.define;
 
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
 
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/CallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/CallbackInstrumentation.java
similarity index 95%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/CallbackInstrumentation.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/CallbackInstrumentation.java
index 922968c..7bf7b40 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/CallbackInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/CallbackInstrumentation.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v1.define;
+package org.apache.skywalking.apm.plugin.kafka.define;
 
 import net.bytebuddy.description.method.MethodDescription;
 import net.bytebuddy.matcher.ElementMatcher;
@@ -31,7 +31,7 @@ public class CallbackInstrumentation extends AbstractKafkaInstrumentation {
 
     public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.Callback";
     public static final String ENHANCE_METHOD = "onCompletion";
-    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.CallbackInterceptor";
+    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor";
 
     @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
         return new ConstructorInterceptPoint[0];
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
similarity index 90%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaConsumerInstrumentation.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
index 0aef949..cfc9471 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaConsumerInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v1.define;
+package org.apache.skywalking.apm.plugin.kafka.define;
 
 import net.bytebuddy.description.method.MethodDescription;
 import net.bytebuddy.matcher.ElementMatcher;
@@ -38,18 +38,19 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName
  *  4. Stop the entry span when <code>pollOnce</code> method finished.
  * </pre>
  *
- * @author zhang xin
+ * @author zhang xin,stalary
  */
 public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
 
     public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerConfig";
-    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.ConsumerConstructorInterceptor";
-    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.KafkaConsumerInterceptor";
+    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConsumerConstructorInterceptor";
+    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
     public static final String ENHANCE_METHOD = "pollOnce";
+    public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
     public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
     public static final String SUBSCRIBE_METHOD = "subscribe";
     public static final String SUBSCRIBE_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener";
-    public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.SubscribeMethodInterceptor";
+    public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.SubscribeMethodInterceptor";
 
     @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
         return new ConstructorInterceptPoint[] {
@@ -69,7 +70,7 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
         return new InstanceMethodsInterceptPoint[] {
             new InstanceMethodsInterceptPoint() {
                 @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
-                    return named(ENHANCE_METHOD);
+                    return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD));
                 }
 
                 @Override public String getMethodsInterceptor() {
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaProducerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerInstrumentation.java
similarity index 71%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaProducerInstrumentation.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerInstrumentation.java
index fa702c8..9fe0d3b 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaProducerInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerInstrumentation.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v1.define;
+package org.apache.skywalking.apm.plugin.kafka.define;
 
 import net.bytebuddy.description.method.MethodDescription;
 import net.bytebuddy.matcher.ElementMatcher;
@@ -39,48 +39,57 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName
  *  3. Stop the exit span when <code>send</code> method finished.
  * </pre>
  *
- * @author zhang xin
+ * @author zhang xin, stalary
  */
 public class KafkaProducerInstrumentation extends AbstractKafkaInstrumentation {
 
-    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.KafkaProducerInterceptor";
+    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaProducerInterceptor";
     public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer";
-    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.ProducerConstructorInterceptor";
+    public static final String ENHANCE_METHOD = "doSend";
+    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ProducerConstructorInterceptor";
     public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "org.apache.kafka.clients.producer.ProducerConfig";
 
-    @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
-        return new ConstructorInterceptPoint[] {
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[]{
             new ConstructorInterceptPoint() {
-                @Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
+                @Override
+                public ElementMatcher<MethodDescription> getConstructorMatcher() {
                     return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
                 }
 
-                @Override public String getConstructorInterceptor() {
+                @Override
+                public String getConstructorInterceptor() {
                     return CONSTRUCTOR_INTERCEPTOR_CLASS;
                 }
             }
         };
     }
 
-    @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
-        return new InstanceMethodsInterceptPoint[] {
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[]{
             new InstanceMethodsInterceptPoint() {
-                @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
-                    return named("doSend");
+                @Override
+                public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(ENHANCE_METHOD);
                 }
 
-                @Override public String getMethodsInterceptor() {
+                @Override
+                public String getMethodsInterceptor() {
                     return INTERCEPTOR_CLASS;
                 }
 
-                @Override public boolean isOverrideArgs() {
+                @Override
+                public boolean isOverrideArgs() {
                     return false;
                 }
             }
         };
     }
 
-    @Override protected ClassMatch enhanceClass() {
+    @Override
+    protected ClassMatch enhanceClass() {
         return byName(ENHANCE_CLASS);
     }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerMapInstrumentation.java
similarity index 60%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerMapInstrumentation.java
index e40f14b..8888c80 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerMapInstrumentation.java
@@ -15,8 +15,7 @@
  * limitations under the License.
  *
  */
-
-package org.apache.skywalking.apm.plugin.kafka.v1.define;
+package org.apache.skywalking.apm.plugin.kafka.define;
 
 import net.bytebuddy.description.method.MethodDescription;
 import net.bytebuddy.matcher.ElementMatcher;
@@ -24,33 +23,43 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterc
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
 import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
 
-import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
 import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
 
-public class ProducerRecordInstrumentation extends AbstractKafkaInstrumentation {
+/**
+ * after version 2.1.0 use Map to config
+ * @author stalary
+ */
+public class KafkaProducerMapInstrumentation extends AbstractKafkaInstrumentation {
 
-    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.ProducerRecordConstructorInterceptor";
-    public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.ProducerRecord";
+    public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer";
+    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ProducerConstructorMapInterceptor";
+    public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "java.util.Map";
 
-    @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
-        return new ConstructorInterceptPoint[] {
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[]{
             new ConstructorInterceptPoint() {
-                @Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
-                    return takesArguments(6);
+                @Override
+                public ElementMatcher<MethodDescription> getConstructorMatcher() {
+                    return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
                 }
 
-                @Override public String getConstructorInterceptor() {
+                @Override
+                public String getConstructorInterceptor() {
                     return CONSTRUCTOR_INTERCEPTOR_CLASS;
                 }
             }
         };
     }
 
-    @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
         return new InstanceMethodsInterceptPoint[0];
     }
 
-    @Override protected ClassMatch enhanceClass() {
+    @Override
+    protected ClassMatch enhanceClass() {
         return byName(ENHANCE_CLASS);
     }
-}
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
similarity index 67%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/resources/skywalking-plugin.def
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
index 59e564a..1a2fb5e 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/resources/skywalking-plugin.def
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.CallbackInstrumentation
-kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.KafkaConsumerInstrumentation
-kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.KafkaProducerInstrumentation
-kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.ProducerRecordInstrumentation
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstrumentation
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
+kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/CallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java
similarity index 89%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/CallbackInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java
index 0629b20..ac6c206 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/CallbackInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java
@@ -16,9 +16,8 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v11;
+package org.apache.skywalking.apm.plugin.kafka;
 
-import java.util.List;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.skywalking.apm.agent.core.context.MockContextSnapshot;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
@@ -27,13 +26,7 @@ import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
 import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
-import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
-import org.apache.skywalking.apm.agent.test.tools.SegmentRefAssert;
-import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
-import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
-import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
-import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
-import org.apache.skywalking.apm.plugin.kafka.v1.CallbackInterceptor;
+import org.apache.skywalking.apm.agent.test.tools.*;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -43,6 +36,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 
+import java.util.List;
+
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
@@ -143,6 +138,6 @@ public class CallbackInterceptorTest {
     }
 
     private void assertCallbackSpan(AbstractTracingSpan span) {
-        assertThat(span.getOperationName(), is("Producer/Callback"));
+        assertThat(span.getOperationName(), is("Kafka/Producer/Callback"));
     }
 }
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ConsumerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
similarity index 92%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ConsumerConstructorInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
index ad642dd..dff6400 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ConsumerConstructorInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
@@ -16,17 +16,16 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v11;
+package org.apache.skywalking.apm.plugin.kafka;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerConstructorInterceptor;
-import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerEnhanceRequiredInfo;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
+
 import java.util.ArrayList;
 import java.util.List;
 
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptorTest.java
similarity index 91%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaConsumerInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptorTest.java
index dc8a832..aa3543b 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaConsumerInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptorTest.java
@@ -16,12 +16,8 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v11;
+package org.apache.skywalking.apm.plugin.kafka;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.skywalking.apm.agent.core.conf.Config;
@@ -32,13 +28,7 @@ import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
 import org.apache.skywalking.apm.agent.test.helper.SegmentRefHelper;
-import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
-import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
-import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
-import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
-import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
-import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerEnhanceRequiredInfo;
-import org.apache.skywalking.apm.plugin.kafka.v1.KafkaConsumerInterceptor;
+import org.apache.skywalking.apm.agent.test.tools.*;
 import org.hamcrest.MatcherAssert;
 import org.junit.After;
 import org.junit.Before;
@@ -48,6 +38,11 @@ import org.junit.runner.RunWith;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.KAFKA_CONSUMER;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
@@ -92,6 +87,7 @@ public class KafkaConsumerInterceptorTest {
         brokers.add("localhost:9092");
         brokers.add("localhost:19092");
         consumerEnhanceRequiredInfo.setBrokerServers(brokers);
+        consumerEnhanceRequiredInfo.setGroupId("test");
 
         messages = new HashMap<TopicPartition, List<ConsumerRecord>>();
         TopicPartition topicPartition = new TopicPartition("test", 1);
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaProducerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptorTest.java
similarity index 88%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaProducerInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptorTest.java
index 0ca6fd7..4dbde02 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaProducerInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptorTest.java
@@ -16,21 +16,15 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v11;
+package org.apache.skywalking.apm.plugin.kafka;
 
-import java.util.List;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
 import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
 import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
-import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
-import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
-import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
-import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
-import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
-import org.apache.skywalking.apm.plugin.kafka.v1.KafkaProducerInterceptor;
+import org.apache.skywalking.apm.agent.test.tools.*;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -38,6 +32,8 @@ import org.junit.runner.RunWith;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 
+import java.util.List;
+
 import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.KAFKA_PRODUCER;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -71,7 +67,7 @@ public class KafkaProducerInterceptorTest {
     private class MockProducerMessage extends ProducerRecord implements EnhancedInstance {
 
         public MockProducerMessage() {
-            super("test", "");
+            super("test", "key1", "");
         }
 
         @Override public Object getSkyWalkingDynamicField() {
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptorTest.java
similarity index 94%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerConstructorInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptorTest.java
index 6eee953..cdc3b02 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerConstructorInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptorTest.java
@@ -16,19 +16,19 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v11;
+package org.apache.skywalking.apm.plugin.kafka;
 
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.plugin.kafka.v1.ProducerConstructorInterceptor;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.mockito.Mockito.when;
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/SubscribeMethodInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java
similarity index 92%
rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/SubscribeMethodInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java
index 66a0842..a4336d9 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/SubscribeMethodInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java
@@ -16,20 +16,19 @@
  *
  */
 
-package org.apache.skywalking.apm.plugin.kafka.v11;
+package org.apache.skywalking.apm.plugin.kafka;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerEnhanceRequiredInfo;
-import org.apache.skywalking.apm.plugin.kafka.v1.SubscribeMethodInterceptor;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java
deleted file mode 100644
index 5a1a9d6..0000000
--- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.plugin.kafka.v11;
-
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.plugin.kafka.v1.ProducerRecordConstructorInterceptor;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-
-@RunWith(MockitoJUnitRunner.class)
-public class ProducerRecordConstructorInterceptorTest {
-
-    @Mock
-    private ProducerRecordConstructorInterceptor constructorInterceptor;
-
-    private EnhancedInstance enhancedInstance = new EnhancedInstance() {
-        private String brokerServers;
-
-        @Override public Object getSkyWalkingDynamicField() {
-            return brokerServers;
-        }
-
-        @Override public void setSkyWalkingDynamicField(Object value) {
-            brokerServers = (String)value;
-        }
-    };
-
-    @Before
-    public void setUp() {
-        constructorInterceptor = new ProducerRecordConstructorInterceptor();
-    }
-
-    @Test
-    public void testOnConsumer() {
-        constructorInterceptor.onConstruct(enhancedInstance, new Object[] {"test"});
-        assertThat(enhancedInstance.getSkyWalkingDynamicField().toString(), is("test"));
-    }
-}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml
index 96509d5..f5669cd 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -59,7 +59,7 @@
         <module>elastic-job-2.x-plugin</module>
         <module>mongodb-2.x-plugin</module>
         <module>httpasyncclient-4.x-plugin</module>
-        <module>kafka-v1-plugin</module>
+        <module>kafka-plugin</module>
         <module>servicecomb-plugin</module>
         <module>hystrix-1.x-plugin</module>
         <module>sofarpc-plugin</module>