You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/01/29 03:09:18 UTC

[GitHub] wu-sheng closed pull request #772: [Agent] Support kafka framework

wu-sheng closed pull request #772: [Agent] Support kafka framework
URL: https://github.com/apache/incubator-skywalking/pull/772
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
index b23f30cf3..220b1062c 100644
--- a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
+++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
@@ -78,6 +78,8 @@
 
     public static final OfficialComponent HTTP_ASYNC_CLIENT = new OfficialComponent(26, "httpasyncclient");
 
+    public static final OfficialComponent KAFKA = new OfficialComponent(27, "Kafka");
+
     private static ComponentsDefine INSTANCE = new ComponentsDefine();
 
     private String[] components;
@@ -87,7 +89,7 @@ public static ComponentsDefine getInstance() {
     }
 
     public ComponentsDefine() {
-        components = new String[27];
+        components = new String[28];
         addComponent(TOMCAT);
         addComponent(HTTPCLIENT);
         addComponent(DUBBO);
@@ -114,6 +116,7 @@ public ComponentsDefine() {
         addComponent(ELASTIC_JOB);
         addComponent(ROCKET_MQ);
         addComponent(HTTP_ASYNC_CLIENT);
+        addComponent(KAFKA);
     }
 
     private void addComponent(OfficialComponent component) {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractSpan.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractSpan.java
index ada06d3d0..3ce6518d3 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractSpan.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractSpan.java
@@ -116,4 +116,6 @@
      * @param ref segment ref
      */
     void ref(TraceSegmentRef ref);
+
+    AbstractSpan start(long starttime);
 }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
index 90d174d02..2028215ae 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
@@ -239,6 +239,12 @@ public AbstractTracingSpan setComponent(String componentName) {
         return this;
     }
 
+    @Override
+    public AbstractSpan start(long startTime) {
+        this.startTime = startTime;
+        return this;
+    }
+
     public SpanObject.Builder transform() {
         SpanObject.Builder spanBuilder = SpanObject.newBuilder();
 
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/NoopSpan.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/NoopSpan.java
index 4be30d1b7..df64b9015 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/NoopSpan.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/NoopSpan.java
@@ -102,4 +102,8 @@ public AbstractSpan tag(String key, String value) {
 
     @Override public void ref(TraceSegmentRef ref) {
     }
+
+    @Override public AbstractSpan start(long startTime) {
+        return this;
+    }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/pom.xml
new file mode 100644
index 000000000..33a6c4b19
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>apm-sdk-plugin</artifactId>
+        <groupId>org.apache.skywalking</groupId>
+        <version>5.0.0-alpha</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>apm-kafka-v1-plugin</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.11.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/CallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/CallbackInterceptor.java
new file mode 100644
index 000000000..3d3f38e9a
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/CallbackInterceptor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v1;
+
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+        MethodInterceptResult result) throws Throwable {
+        AbstractSpan abstractSpan = ContextManager.createLocalSpan("Producer/Callback");
+
+        //Get the SnapshotContext
+        ContextSnapshot contextSnapshot = (ContextSnapshot)objInst.getSkyWalkingDynamicField();
+        ContextManager.continued(contextSnapshot);
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+        Object ret) throws Throwable {
+        Exception exceptions = (Exception)allArguments[1];
+        if (exceptions != null) {
+            ContextManager.activeSpan().errorOccurred().log(exceptions);
+        }
+        ContextManager.stopSpan();
+        return ret;
+    }
+
+    @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+        Class<?>[] argumentsTypes, Throwable t) {
+        ContextManager.activeSpan().errorOccurred().log(t);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerConstructorInterceptor.java
new file mode 100644
index 000000000..77880a7ce
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerConstructorInterceptor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v1;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
+
+    @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+        ConsumerConfig config = (ConsumerConfig)allArguments[0];
+        // set the bootstrap server address
+        ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
+        requiredInfo.setBrokerServers(config.getList("bootstrap.servers"));
+        objInst.setSkyWalkingDynamicField(requiredInfo);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerEnhanceRequiredInfo.java
new file mode 100644
index 000000000..62018beb6
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerEnhanceRequiredInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v1;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.skywalking.apm.util.StringUtil;
+
+public class ConsumerEnhanceRequiredInfo {
+    private String brokerServers;
+    private String topics;
+    private long startTime;
+
+    public void setBrokerServers(List<String> brokerServers) {
+        this.brokerServers =  StringUtil.join(';', brokerServers.toArray(new String[0]));
+    }
+
+    public void setTopics(Collection<String> topics) {
+        this.topics = StringUtil.join(';', topics.toArray(new String[0]));
+    }
+
+    public String getBrokerServers() {
+        return brokerServers;
+    }
+
+    public String getTopics() {
+        return topics;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaConsumerInterceptor.java
new file mode 100644
index 000000000..db790b8b0
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaConsumerInterceptor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v1;
+
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * @autor zhang xin
+ */
+public class KafkaConsumerInterceptor implements InstanceMethodsAroundInterceptor {
+
+    public static final String OPERATE_NAME_PREFIX = "Kafka/";
+    public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
+
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+        MethodInterceptResult result) throws Throwable {
+        ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
+        requiredInfo.setStartTime(System.currentTimeMillis());
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+        Object ret) throws Throwable {
+        Map<TopicPartition, List<ConsumerRecord<?, ?>>> records = (Map<TopicPartition, List<ConsumerRecord<?, ?>>>)ret;
+        //
+        // The entry span will only be created when the consumer received at least one message.
+        //
+        if (records.size() > 0) {
+            ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
+            AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME_SUFFIX, null).start(requiredInfo.getStartTime());
+
+            activeSpan.setComponent(ComponentsDefine.KAFKA);
+            SpanLayer.asMQ(activeSpan);
+            Tags.MQ_BROKER.set(activeSpan, requiredInfo.getBrokerServers());
+            Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopics());
+
+            for (List<ConsumerRecord<?, ?>> consumerRecords : records.values()) {
+                for (ConsumerRecord<?, ?> record : consumerRecords) {
+                    ContextCarrier contextCarrier = new ContextCarrier();
+
+                    CarrierItem next = contextCarrier.items();
+                    while (next.hasNext()) {
+                        next = next.next();
+                        Iterator<Header> iterator = record.headers().headers(next.getHeadKey()).iterator();
+                        if (iterator.hasNext()) {
+                            next.setHeadValue(new String(iterator.next().value()));
+                        }
+                    }
+                    ContextManager.extract(contextCarrier);
+                }
+            }
+            ContextManager.stopSpan();
+        }
+        return ret;
+    }
+
+    @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+        Class<?>[] argumentsTypes, Throwable t) {
+        ContextManager.activeSpan().errorOccurred().log(t);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaProducerInterceptor.java
new file mode 100644
index 000000000..1ec0d68ee
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaProducerInterceptor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v1;
+
+import java.lang.reflect.Method;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * @author zhang xin
+ */
+public class KafkaProducerInterceptor implements InstanceMethodsAroundInterceptor {
+
+    public static final String OPERATE_NAME_PREFIX = "Kafka/";
+    public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer";
+
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+        MethodInterceptResult result) throws Throwable {
+
+        ContextCarrier contextCarrier = new ContextCarrier();
+
+        ProducerRecord record = (ProducerRecord)allArguments[0];
+        String topicName = (String)((EnhancedInstance)record).getSkyWalkingDynamicField();
+
+        AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String)objInst.getSkyWalkingDynamicField());
+
+        Tags.MQ_BROKER.set(activeSpan, (String)objInst.getSkyWalkingDynamicField());
+        Tags.MQ_TOPIC.set(activeSpan, topicName);
+        SpanLayer.asMQ(activeSpan);
+        activeSpan.setComponent(ComponentsDefine.KAFKA);
+
+        CarrierItem next = contextCarrier.items();
+        while (next.hasNext()) {
+            next = next.next();
+            record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes());
+        }
+
+        EnhancedInstance callbackInstance = (EnhancedInstance)allArguments[1];
+        if (callbackInstance != null) {
+            callbackInstance.setSkyWalkingDynamicField(ContextManager.capture());
+        }
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+        Object ret) throws Throwable {
+        ContextManager.stopSpan();
+        return ret;
+    }
+
+    @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+        Class<?>[] argumentsTypes, Throwable t) {
+
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerConstructorInterceptor.java
new file mode 100644
index 000000000..604f080c8
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerConstructorInterceptor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v1;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+import org.apache.skywalking.apm.util.StringUtil;
+
+public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {
+
+    @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+        ProducerConfig config = (ProducerConfig)allArguments[0];
+        objInst.setSkyWalkingDynamicField(StringUtil.join(';', config.getList("bootstrap.servers").toArray(new String[0])));
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java
new file mode 100644
index 000000000..1a2d5e3ff
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v1;
+
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+public class ProducerRecordConstructorInterceptor implements InstanceConstructorInterceptor {
+    @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+        String topic = (String)allArguments[0];
+        objInst.setSkyWalkingDynamicField(topic);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/SubscribeMethodInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/SubscribeMethodInterceptor.java
new file mode 100644
index 000000000..7c4e65598
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/SubscribeMethodInterceptor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v1;
+
+import java.lang.reflect.Method;
+import java.util.Collection;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+public class SubscribeMethodInterceptor implements InstanceMethodsAroundInterceptor {
+    @Override
+    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+        MethodInterceptResult result) throws Throwable {
+        ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
+        requiredInfo.setTopics((Collection<String>)allArguments[0]);
+
+        objInst.setSkyWalkingDynamicField(requiredInfo);
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+        Object ret) throws Throwable {
+        return ret;
+    }
+
+    @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+        Class<?>[] argumentsTypes, Throwable t) {
+        ContextManager.activeSpan().errorOccurred().log(t);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/CallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/CallbackInstrumentation.java
new file mode 100644
index 000000000..e478d6204
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/CallbackInstrumentation.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v1.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.kafka.v1.CallbackInterceptor;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * {@link CallbackInstrumentation} defined that {@link CallbackInterceptor}
+ * intercept the method onCompletion in the class <code>org.apache.kafka.clients.producer.Callback</code>.
+ *
+ * @author zhangxin
+ */
+public class CallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+    public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.Callback";
+    public static final String ENHANCE_METHOD = "onCompletion";
+    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.CallbackInterceptor";
+
+    @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[0];
+    }
+
+    @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(ENHANCE_METHOD);
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byHierarchyMatch(new String[] {ENHANCE_CLASS});
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaConsumerInstrumentation.java
new file mode 100644
index 000000000..e4e1445f6
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaConsumerInstrumentation.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v1.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.kafka.v1.KafkaConsumerInterceptor;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * {@link KafkaProducerInstrumentation} define that {@link KafkaConsumerInterceptor}
+ * intercept the method <code>send</code> in the class <code>org.apache.kafka.clients.producer.KafkaProducer</code>.
+ * Here is the intercept process steps.
+ *
+ *
+ * <pre>
+ *  1. Record the topic when the client invoke <code>subscribed</code> method
+ *  2. Create the entry span when the client invoke the method <code>pollOnce</code>.
+ *  3. Extract all the <code>Trace Context</code> by iterate all <code>ConsumerRecord</code>
+ *  4. Stop the entry span when <code>pollOnce</code> method finished.
+ * </pre>
+ *
+ * @author zhang xin
+ */
+public class KafkaConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+    public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerConfig";
+    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.ConsumerConstructorInterceptor";
+    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.KafkaConsumerInterceptor";
+    public static final String ENHANCE_METHOD = "pollOnce";
+    public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
+    public static final String SUBSCRIBE_METHOD = "subscribe";
+    public static final String SUBSCRIBE_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener";
+
+    @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[] {
+            new ConstructorInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
+                    return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
+                }
+
+                @Override public String getConstructorInterceptor() {
+                    return CONSTRUCTOR_INTERCEPTOR_CLASS;
+                }
+            }
+        };
+    }
+
+    @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(ENHANCE_METHOD);
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            },
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(SUBSCRIBE_METHOD).and(takesArgumentWithType(1, SUBSCRIBE_INTERCEPT_TYPE));
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return "org.apache.skywalking.apm.plugin.kafka.v11.SubscribeMethodInterceptor";
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaProducerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaProducerInstrumentation.java
new file mode 100644
index 000000000..54932a8af
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaProducerInstrumentation.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v1.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.kafka.v1.KafkaProducerInterceptor;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * {@link KafkaProducerInstrumentation} define that {@link KafkaProducerInterceptor}
+ * intercept the method <code>send</code> in the class <code>org.apache.kafka.clients.producer.KafkaProducer</code>.
+ * Here is the intercept process steps.
+ *
+ *
+ * <pre>
+ *  1. Record the broker address when the client create the <code>org.apache.kafka.clients.producer.KafkaProducer</code>
+ * instance
+ *  2. Create the exit span when the client invoke <code>send</code> method
+ *  3. Inject the context to {@link org.apache.kafka.clients.producer.ProducerRecord#headers}
+ *  3. Stop the exit span when <code>send</code> method finished.
+ * </pre>
+ *
+ * @author zhang xin
+ */
+public class KafkaProducerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+    public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.KafkaProducerInterceptor";
+    public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer";
+    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.ProducerConstructorInterceptor";
+    public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "org.apache.kafka.clients.producer.ProducerConfig";
+
+    @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[] {
+            new ConstructorInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
+                    return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
+                }
+
+                @Override public String getConstructorInterceptor() {
+                    return CONSTRUCTOR_INTERCEPTOR_CLASS;
+                }
+            }
+        };
+    }
+
+    @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named("doSend");
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java
new file mode 100644
index 000000000..fe4d8b203
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v1.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.kafka.v1.ProducerRecordConstructorInterceptor;
+
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * {@link ProducerRecordInstrumentation} define that {@link ProducerRecordConstructorInterceptor}
+ * intercept the constructor in the class <code>org.apache.kafka.clients.producer.ProducerRecord</code> for record the
+ * topic name and propagate the <code>Context</code> of trace.
+ *
+ * @author zhang xin
+ * @see org.apache.skywalking.apm.plugin.kafka.v1.define.KafkaProducerInstrumentation
+ */
+public class ProducerRecordInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.ProducerRecordConstructorInterceptor";
+    public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.ProducerRecord";
+
+    @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[] {
+            new ConstructorInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
+                    return takesArguments(6);
+                }
+
+                @Override public String getConstructorInterceptor() {
+                    return CONSTRUCTOR_INTERCEPTOR_CLASS;
+                }
+            }
+        };
+    }
+
+    @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+        return new InstanceMethodsInterceptPoint[0];
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/resources/skywalking-plugin.def
new file mode 100644
index 000000000..a6e5972a0
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,4 @@
+kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.CallbackInstrumentation
+kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.KafkaConsumerInstrumentation
+kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.KafkaProducerInstrumentation
+kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.ProducerRecordInstrumentation
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/CallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/CallbackInterceptorTest.java
new file mode 100644
index 000000000..17ddaf443
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/CallbackInterceptorTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v11;
+
+import java.util.List;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.skywalking.apm.agent.core.context.MockContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentRefAssert;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.apache.skywalking.apm.plugin.kafka.v1.CallbackInterceptor;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+@PrepareForTest({RecordMetadata.class})
+public class CallbackInterceptorTest {
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+
+    @Mock
+    private RecordMetadata recordMetadata;
+
+    private CallbackInterceptor callbackInterceptor;
+
+    private Object[] arguments;
+    private Object[] argumentsWithException;
+    private Class[] argumentTypes;
+
+    private EnhancedInstance callBackInstance = new EnhancedInstance() {
+        @Override public Object getSkyWalkingDynamicField() {
+            return MockContextSnapshot.INSTANCE.mockContextSnapshot();
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+
+        }
+    };
+
+    @Before
+    public void setUp() {
+        callbackInterceptor = new CallbackInterceptor();
+
+        arguments = new Object[] {
+            recordMetadata, null
+        };
+        argumentsWithException = new Object[] {
+            recordMetadata, new RuntimeException()
+        };
+
+        argumentTypes = new Class[] {
+            RecordMetadata.class, Exception.class
+        };
+
+    }
+
+    @Test
+    public void testCallbackWithoutException() throws Throwable {
+        callbackInterceptor.beforeMethod(callBackInstance, null, arguments, argumentTypes, null);
+        callbackInterceptor.afterMethod(callBackInstance, null, arguments, argumentTypes, null);
+
+        List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
+        assertThat(traceSegments.size(), is(1));
+        TraceSegment traceSegment = traceSegments.get(0);
+
+        List<AbstractTracingSpan> abstractSpans = SegmentHelper.getSpans(traceSegment);
+        assertThat(abstractSpans.size(), is(1));
+
+        assertCallbackSpan(abstractSpans.get(0));
+
+        assertCallbackSegmentRef(traceSegment.getRefs());
+    }
+
+    @Test
+    public void testCallbackWithException() throws Throwable {
+        callbackInterceptor.beforeMethod(callBackInstance, null, argumentsWithException, argumentTypes, null);
+        callbackInterceptor.afterMethod(callBackInstance, null, argumentsWithException, argumentTypes, null);
+
+        List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
+        assertThat(traceSegments.size(), is(1));
+        TraceSegment traceSegment = traceSegments.get(0);
+
+        List<AbstractTracingSpan> abstractSpans = SegmentHelper.getSpans(traceSegment);
+        assertThat(abstractSpans.size(), is(1));
+
+        assertCallbackSpanWithException(abstractSpans.get(0));
+
+        assertCallbackSegmentRef(traceSegment.getRefs());
+    }
+
+    private void assertCallbackSpanWithException(AbstractTracingSpan span) {
+        assertCallbackSpan(span);
+
+        SpanAssert.assertException(SpanHelper.getLogs(span).get(0), RuntimeException.class);
+        assertThat(SpanHelper.getErrorOccurred(span), is(true));
+    }
+
+    private void assertCallbackSegmentRef(List<TraceSegmentRef> refs) {
+        assertThat(refs.size(), is(1));
+
+        TraceSegmentRef segmentRef = refs.get(0);
+        SegmentRefAssert.assertSpanId(segmentRef, 1);
+        assertThat(segmentRef.getEntryOperationName(), is("/for-test-entryOperationName"));
+    }
+
+    private void assertCallbackSpan(AbstractTracingSpan span) {
+        assertThat(span.getOperationName(), is("Producer/Callback"));
+    }
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ConsumerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ConsumerConstructorInterceptorTest.java
new file mode 100644
index 000000000..e6125ba78
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ConsumerConstructorInterceptorTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v11;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerConstructorInterceptor;
+import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerEnhanceRequiredInfo;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConsumerConstructorInterceptorTest {
+
+    @Mock
+    private ConsumerConfig consumerConfig;
+
+    @Mock
+    private ConsumerConstructorInterceptor constructorInterceptor;
+
+    private EnhancedInstance enhancedInstance = new EnhancedInstance() {
+        private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
+
+        @Override public Object getSkyWalkingDynamicField() {
+            return consumerEnhanceRequiredInfo;
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+            consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo)value;
+        }
+    };
+
+    @Before
+    public void setUp() {
+        List<String> mockBootstrapServers = new ArrayList<String>();
+        mockBootstrapServers.add("localhost:9092");
+        mockBootstrapServers.add("localhost:19092");
+        when(consumerConfig.getList("bootstrap.servers")).thenReturn(mockBootstrapServers);
+
+        constructorInterceptor = new ConsumerConstructorInterceptor();
+    }
+
+    @Test
+    public void testOnConsumer() {
+        constructorInterceptor.onConstruct(enhancedInstance, new Object[] {consumerConfig});
+        ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo)enhancedInstance.getSkyWalkingDynamicField();
+        assertThat(consumerEnhanceRequiredInfo.getBrokerServers(), is("localhost:9092;localhost:19092"));
+    }
+
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaConsumerInterceptorTest.java
new file mode 100644
index 000000000..06479bcda
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaConsumerInterceptorTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v11;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.helper.SegmentRefHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerEnhanceRequiredInfo;
+import org.apache.skywalking.apm.plugin.kafka.v1.KafkaConsumerInterceptor;
+import org.hamcrest.MatcherAssert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.KAFKA;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class KafkaConsumerInterceptorTest {
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+
+    private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
+
+    private KafkaConsumerInterceptor consumerInterceptor;
+
+    private EnhancedInstance consumerInstance = new EnhancedInstance() {
+        @Override public Object getSkyWalkingDynamicField() {
+            return consumerEnhanceRequiredInfo;
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+            consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo)value;
+        }
+    };
+
+    private Map<TopicPartition, List<ConsumerRecord>> messages;
+
+    @Before
+    public void setUp() {
+        consumerInterceptor = new KafkaConsumerInterceptor();
+        consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo();
+
+        List<String> topics = new ArrayList<String>();
+        topics.add("test");
+        topics.add("test-1");
+        consumerEnhanceRequiredInfo.setTopics(topics);
+        List<String> brokers = new ArrayList<String>();
+        brokers.add("localhost:9092");
+        brokers.add("localhost:19092");
+        consumerEnhanceRequiredInfo.setBrokerServers(brokers);
+
+        messages = new HashMap<TopicPartition, List<ConsumerRecord>>();
+        TopicPartition topicPartition = new TopicPartition("test", 1);
+        List<ConsumerRecord> records = new ArrayList<ConsumerRecord>();
+        ConsumerRecord consumerRecord = new ConsumerRecord("test", 1, 0, "1", "1");
+        consumerRecord.headers().add("sw3", "1.234.111|3|1|1|#192.168.1.8:18002|#/portal/|#testEntrySpan|#AQA*#AQA*Et0We0tQNQA*".getBytes());
+        records.add(consumerRecord);
+        messages.put(topicPartition, records);
+    }
+
+    @Test
+    public void testConsumerWithoutMessage() throws Throwable {
+        consumerInterceptor.beforeMethod(consumerInstance, null, new Object[0], new Class[0], null);
+        consumerInterceptor.afterMethod(consumerInstance, null, new Object[0], new Class[0], new HashMap<TopicPartition, List<ConsumerRecord>>());
+
+        List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
+        assertThat(traceSegments.size(), is(0));
+    }
+
+    @Test
+    public void testConsumerWithMessage() throws Throwable {
+        consumerInterceptor.beforeMethod(consumerInstance, null, new Object[0], new Class[0], null);
+        consumerInterceptor.afterMethod(consumerInstance, null, new Object[0], new Class[0], messages);
+
+        List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
+        assertThat(traceSegments.size(), is(1));
+
+        TraceSegment traceSegment = traceSegments.get(0);
+        List<TraceSegmentRef> refs = traceSegment.getRefs();
+        assertThat(refs.size(), is(1));
+        assertTraceSegmentRef(refs.get(0));
+
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+        assertThat(spans.size(), is(1));
+        assertConsumerSpan(spans.get(0));
+    }
+
+    private void assertConsumerSpan(AbstractTracingSpan span) {
+        SpanAssert.assertLayer(span, SpanLayer.MQ);
+        SpanAssert.assertComponent(span, KAFKA);
+        SpanAssert.assertTagSize(span, 2);
+        SpanAssert.assertTag(span, 0, "localhost:9092;localhost:19092");
+        SpanAssert.assertTag(span, 1, "test;test-1");
+    }
+
+    private void assertTraceSegmentRef(TraceSegmentRef ref) {
+        MatcherAssert.assertThat(SegmentRefHelper.getEntryApplicationInstanceId(ref), is(1));
+        MatcherAssert.assertThat(SegmentRefHelper.getSpanId(ref), is(3));
+        MatcherAssert.assertThat(SegmentRefHelper.getTraceSegmentId(ref).toString(), is("1.234.111"));
+    }
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaProducerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaProducerInterceptorTest.java
new file mode 100644
index 000000000..aff40ca32
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaProducerInterceptorTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v11;
+
+import java.util.List;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.apache.skywalking.apm.plugin.kafka.v1.KafkaProducerInterceptor;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.KAFKA;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class KafkaProducerInterceptorTest {
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+
+    private KafkaProducerInterceptor producerInterceptor;
+
+    private Object[] arguments;
+    private Class[] argumentType;
+
+    private EnhancedInstance kafkaProducerInstance = new EnhancedInstance() {
+        @Override public Object getSkyWalkingDynamicField() {
+            return "localhost:9092";
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+
+        }
+    };
+
+    private EnhancedInstance messageInstance = new MockProducerMessage();
+
+    private class MockProducerMessage extends ProducerRecord implements EnhancedInstance {
+
+        public MockProducerMessage() {
+            super("test", "");
+        }
+
+        @Override public Object getSkyWalkingDynamicField() {
+            return "test";
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+
+        }
+    }
+
+    @Before
+    public void setUp() {
+        producerInterceptor = new KafkaProducerInterceptor();
+
+        arguments = new Object[] {messageInstance, null};
+        argumentType = new Class[] {ProducerRecord.class};
+    }
+
+    @Test
+    public void testSendMessage() throws Throwable {
+        producerInterceptor.beforeMethod(kafkaProducerInstance, null, arguments, argumentType, null);
+        producerInterceptor.afterMethod(kafkaProducerInstance, null, arguments, argumentType, null);
+
+        List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
+        assertThat(traceSegmentList.size(), is(1));
+
+        TraceSegment segment = traceSegmentList.get(0);
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
+        assertThat(spans.size(), is(1));
+
+        assertMessageSpan(spans.get(0));
+    }
+
+    private void assertMessageSpan(AbstractTracingSpan span) {
+        SpanAssert.assertTag(span, 0, "localhost:9092");
+        SpanAssert.assertTag(span, 1, "test");
+        SpanAssert.assertComponent(span, KAFKA);
+        SpanAssert.assertLayer(span, SpanLayer.MQ);
+        assertThat(span.getOperationName(), is("Kafka/test/Producer"));
+    }
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerConstructorInterceptorTest.java
new file mode 100644
index 000000000..6eee953f5
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerConstructorInterceptorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v11;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.plugin.kafka.v1.ProducerConstructorInterceptor;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProducerConstructorInterceptorTest {
+    @Mock
+    private ProducerConfig producerConfig;
+
+    @Mock
+    private ProducerConstructorInterceptor constructorInterceptor;
+
+    private EnhancedInstance enhancedInstance = new EnhancedInstance() {
+        private String brokerServers;
+
+        @Override public Object getSkyWalkingDynamicField() {
+            return brokerServers;
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+            brokerServers = (String)value;
+        }
+    };
+
+    @Before
+    public void setUp() {
+        List<String> mockBootstrapServers = new ArrayList<String>();
+        mockBootstrapServers.add("localhost:9092");
+        mockBootstrapServers.add("localhost:19092");
+        when(producerConfig.getList("bootstrap.servers")).thenReturn(mockBootstrapServers);
+        constructorInterceptor = new ProducerConstructorInterceptor();
+    }
+
+    @Test
+    public void testOnConsumer() {
+        constructorInterceptor.onConstruct(enhancedInstance, new Object[] {producerConfig});
+        assertThat(enhancedInstance.getSkyWalkingDynamicField().toString(), is("localhost:9092;localhost:19092"));
+    }
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java
new file mode 100644
index 000000000..5a1a9d677
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v11;
+
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.plugin.kafka.v1.ProducerRecordConstructorInterceptor;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProducerRecordConstructorInterceptorTest {
+
+    @Mock
+    private ProducerRecordConstructorInterceptor constructorInterceptor;
+
+    private EnhancedInstance enhancedInstance = new EnhancedInstance() {
+        private String brokerServers;
+
+        @Override public Object getSkyWalkingDynamicField() {
+            return brokerServers;
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+            brokerServers = (String)value;
+        }
+    };
+
+    @Before
+    public void setUp() {
+        constructorInterceptor = new ProducerRecordConstructorInterceptor();
+    }
+
+    @Test
+    public void testOnConsumer() {
+        constructorInterceptor.onConstruct(enhancedInstance, new Object[] {"test"});
+        assertThat(enhancedInstance.getSkyWalkingDynamicField().toString(), is("test"));
+    }
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/SubscribeMethodInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/SubscribeMethodInterceptorTest.java
new file mode 100644
index 000000000..66a084293
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/SubscribeMethodInterceptorTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka.v11;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerEnhanceRequiredInfo;
+import org.apache.skywalking.apm.plugin.kafka.v1.SubscribeMethodInterceptor;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SubscribeMethodInterceptorTest {
+
+    @Mock
+    private SubscribeMethodInterceptor constructorInterceptor;
+
+    private List<String> mockTopics = new ArrayList<String>();
+
+    private EnhancedInstance enhancedInstance = new EnhancedInstance() {
+        ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo();
+
+        @Override public Object getSkyWalkingDynamicField() {
+            return consumerEnhanceRequiredInfo;
+        }
+
+        @Override public void setSkyWalkingDynamicField(Object value) {
+            this.consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo)value;
+        }
+    };
+
+    @Before
+    public void setUp() {
+        mockTopics.add("test");
+        mockTopics.add("test-1");
+        constructorInterceptor = new SubscribeMethodInterceptor();
+    }
+
+    @Test
+    public void testOnConsumer() throws Throwable {
+        constructorInterceptor.beforeMethod(enhancedInstance, null, new Object[] {mockTopics}, new Class[] {Collection.class}, null);
+        ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)enhancedInstance.getSkyWalkingDynamicField();
+        assertThat(requiredInfo.getTopics(), is("test;test-1"));
+    }
+}
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml
index dba83ca90..095740230 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -53,6 +53,7 @@
         <module>elastic-job-2.x-plugin</module>
         <module>mongodb-2.x-plugin</module>
         <module>httpasyncclient-4.x-plugin</module>
+        <module>kafka-v1-plugin</module>
     </modules>
     <packaging>pom</packaging>
 
diff --git a/apm-sniffer/apm-test-tools/src/main/java/org/apache/skywalking/apm/agent/core/context/MockContextSnapshot.java b/apm-sniffer/apm-test-tools/src/main/java/org/apache/skywalking/apm/agent/core/context/MockContextSnapshot.java
new file mode 100644
index 000000000..afc4f8a51
--- /dev/null
+++ b/apm-sniffer/apm-test-tools/src/main/java/org/apache/skywalking/apm/agent/core/context/MockContextSnapshot.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.context;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.apm.agent.core.context.ids.DistributedTraceId;
+import org.apache.skywalking.apm.agent.core.context.ids.ID;
+import org.apache.skywalking.apm.agent.core.context.ids.NewDistributedTraceId;
+
+public enum MockContextSnapshot {
+    INSTANCE;
+
+    private ContextSnapshot contextSnapshot;
+
+    MockContextSnapshot() {
+        List<DistributedTraceId> distributedTraceIds = new ArrayList<DistributedTraceId>();
+        distributedTraceIds.add(new NewDistributedTraceId());
+
+        contextSnapshot = new ContextSnapshot(new ID(1, 2, 3), 1, distributedTraceIds);
+        contextSnapshot.setEntryApplicationInstanceId(1);
+        contextSnapshot.setEntryOperationId(0);
+        contextSnapshot.setEntryOperationName("/for-test-entryOperationName");
+        contextSnapshot.setParentOperationId(0);
+        contextSnapshot.setParentOperationName("/for-test-parentOperationName");
+    }
+
+    public ContextSnapshot mockContextSnapshot() {
+        return contextSnapshot;
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services