You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/04/29 02:22:50 UTC

[skywalking] branch master updated: Fix #6835 Add compatibility with Kafka client version 2.8.x (#6837)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new e9fc6bc  Fix #6835 Add compatibility with Kafka client version 2.8.x (#6837)
e9fc6bc is described below

commit e9fc6bc639dba4c0dc517d9d65474c1aa4b39598
Author: divyakumarjain <di...@users.noreply.github.com>
AuthorDate: Thu Apr 29 07:52:30 2021 +0530

    Fix #6835 Add compatibility with Kafka client version 2.8.x (#6837)
---
 CHANGES.md                                         |  1 +
 ...java => AbstractConstructorInterceptPoint.java} | 17 ++-----
 ...nstructorWithConsumerConfigInterceptPoint.java} | 22 ++++-----
 .../kafka/ConstructorWithMapInterceptPoint.java    | 53 ++++++++++++++++++++++
 .../kafka/define/KafkaConsumerInstrumentation.java | 20 ++++++--
 ...uctorWithConsumerConfigInterceptPointTest.java} | 14 +++---
 ...a => ConstructorWithMapInterceptPointTest.java} | 20 ++++----
 .../service-agent/java-agent/Supported-list.md     |  2 +-
 .../scenarios/kafka-scenario/support-version.list  |  2 +
 9 files changed, 104 insertions(+), 47 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ddd2224..b4d8f7e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -16,6 +16,7 @@ Release Notes.
 * resolve agent has no retries if connect kafka cluster failed when bootstrap
 * Add Seata in the component definition. Seata plugin hosts on Seata project.
 * Extended Kafka plugin to properly trace consumers that have topic partitions directly assigned.
+* Support Kafka consumer 2.8.0.
 * Support print SkyWalking context to logs.
 * Add `MessageListener` enhancement in pulsar plugin.
 * fix a bug that spring-mvc set an error endpoint name if the controller class annotation implements an interface.
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/AbstractConstructorInterceptPoint.java
similarity index 65%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/AbstractConstructorInterceptPoint.java
index d0fa9e8..fed0494 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/AbstractConstructorInterceptPoint.java
@@ -18,22 +18,15 @@
 
 package org.apache.skywalking.apm.plugin.kafka;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
 
-/**
- *
- **/
-public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
+public abstract class AbstractConstructorInterceptPoint<T> implements InstanceConstructorInterceptor {
 
-    @Override
-    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
-        ConsumerConfig config = (ConsumerConfig) allArguments[0];
-        // set the bootstrap server address
-        ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
-        requiredInfo.setBrokerServers(config.getList("bootstrap.servers"));
-        requiredInfo.setGroupId(config.getString("group.id"));
+    @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+        ConsumerEnhanceRequiredInfo requiredInfo = resolveConsumerEnhanceRequiredInfo((T) allArguments[0]);
         objInst.setSkyWalkingDynamicField(requiredInfo);
     }
+
+    protected abstract ConsumerEnhanceRequiredInfo resolveConsumerEnhanceRequiredInfo(T allArgument);
 }
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithConsumerConfigInterceptPoint.java
similarity index 60%
rename from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithConsumerConfigInterceptPoint.java
index d0fa9e8..65c091c 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithConsumerConfigInterceptPoint.java
@@ -19,21 +19,19 @@
 package org.apache.skywalking.apm.plugin.kafka;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
 
-/**
- *
- **/
-public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
+public class ConstructorWithConsumerConfigInterceptPoint extends AbstractConstructorInterceptPoint<ConsumerConfig> {
 
     @Override
-    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
-        ConsumerConfig config = (ConsumerConfig) allArguments[0];
-        // set the bootstrap server address
+    protected ConsumerEnhanceRequiredInfo resolveConsumerEnhanceRequiredInfo(ConsumerConfig configArgument) {
         ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
-        requiredInfo.setBrokerServers(config.getList("bootstrap.servers"));
-        requiredInfo.setGroupId(config.getString("group.id"));
-        objInst.setSkyWalkingDynamicField(requiredInfo);
+
+        if (configArgument != null) {
+            // set the bootstrap server address
+            requiredInfo.setBrokerServers(configArgument.getList("bootstrap.servers"));
+            requiredInfo.setGroupId(configArgument.getString("group.id"));
+        }
+
+        return requiredInfo;
     }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithMapInterceptPoint.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithMapInterceptPoint.java
new file mode 100644
index 0000000..a27c384
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithMapInterceptPoint.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ *
+ **/
+public class ConstructorWithMapInterceptPoint extends AbstractConstructorInterceptPoint<Map<String, ?>> {
+
+    protected ConsumerEnhanceRequiredInfo resolveConsumerEnhanceRequiredInfo(Map<String, ?> configArgument) {
+        ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
+
+        if (configArgument != null) {
+            // set the bootstrap server address
+            requiredInfo.setBrokerServers(convertToList(configArgument.get("bootstrap.servers")));
+            requiredInfo.setGroupId((String) configArgument.get("group.id"));
+        }
+
+        return requiredInfo;
+    }
+
+    private List<String> convertToList(Object value) {
+        if (value instanceof List)
+            return (List<String>) value;
+        else if (value instanceof String) {
+            return Arrays.stream(((String) value).split(",")).collect(Collectors.toList());
+        }
+
+        return Collections.emptyList();
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
index b2d1066..507e367 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
@@ -41,7 +41,9 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName
 public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
 
     public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerConfig";
-    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConsumerConstructorInterceptor";
+    public static final String CONSTRUCTOR_INTERCEPT_MAP_TYPE = "java.util.Map";
+    public static final String CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithConsumerConfigInterceptPoint";
+    public static final String MAP_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithMapInterceptPoint";
     public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
     public static final String ENHANCE_METHOD = "pollOnce";
     public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
@@ -65,9 +67,21 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
 
                 @Override
                 public String getConstructorInterceptor() {
-                    return CONSTRUCTOR_INTERCEPTOR_CLASS;
+                    return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
                 }
-            }
+            },
+              new ConstructorInterceptPoint() {
+                  @Override
+                  public ElementMatcher<MethodDescription> getConstructorMatcher() {
+                      return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_MAP_TYPE);
+                  }
+
+                @Override
+                public String getConstructorInterceptor() {
+                    return MAP_CONSTRUCTOR_INTERCEPTOR_CLASS;
+                }
+            },
+
         };
     }
 
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithConsumerConfigInterceptPointTest.java
similarity index 83%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithConsumerConfigInterceptPointTest.java
index d6c2b43..5ac604b 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithConsumerConfigInterceptPointTest.java
@@ -34,15 +34,15 @@ import static org.hamcrest.core.Is.is;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public class ConsumerConstructorInterceptorTest {
+public class ConstructorWithConsumerConfigInterceptPointTest {
 
     @Mock
-    private ConsumerConfig consumerConfig;
+    private ConsumerConfig                              consumerConfig;
 
     @Mock
-    private ConsumerConstructorInterceptor constructorInterceptor;
+    private ConstructorWithConsumerConfigInterceptPoint constructorInterceptor;
 
-    private EnhancedInstance enhancedInstance = new EnhancedInstance() {
+    private EnhancedInstance                            enhancedInstance = new EnhancedInstance() {
         private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
 
         @Override
@@ -58,12 +58,12 @@ public class ConsumerConstructorInterceptorTest {
 
     @Before
     public void setUp() {
-        List<String> mockBootstrapServers = new ArrayList<String>();
+        List<String> mockBootstrapServers = new ArrayList<>();
         mockBootstrapServers.add("localhost:9092");
         mockBootstrapServers.add("localhost:19092");
         when(consumerConfig.getList("bootstrap.servers")).thenReturn(mockBootstrapServers);
 
-        constructorInterceptor = new ConsumerConstructorInterceptor();
+        constructorInterceptor = new ConstructorWithConsumerConfigInterceptPoint();
     }
 
     @Test
@@ -73,4 +73,4 @@ public class ConsumerConstructorInterceptorTest {
         assertThat(consumerEnhanceRequiredInfo.getBrokerServers(), is("localhost:9092;localhost:19092"));
     }
 
-}
\ No newline at end of file
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithMapInterceptPointTest.java
similarity index 78%
rename from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithMapInterceptPointTest.java
index d6c2b43..2b3c4b8 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithMapInterceptPointTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.apm.plugin.kafka;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.junit.Before;
 import org.junit.Test;
@@ -26,21 +25,20 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public class ConsumerConstructorInterceptorTest {
+public class ConstructorWithMapInterceptPointTest {
 
     @Mock
-    private ConsumerConfig consumerConfig;
+    private Map<String, String> consumerConfig;
 
     @Mock
-    private ConsumerConstructorInterceptor constructorInterceptor;
+    private ConstructorWithMapInterceptPoint constructorInterceptor;
 
     private EnhancedInstance enhancedInstance = new EnhancedInstance() {
         private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
@@ -58,12 +56,10 @@ public class ConsumerConstructorInterceptorTest {
 
     @Before
     public void setUp() {
-        List<String> mockBootstrapServers = new ArrayList<String>();
-        mockBootstrapServers.add("localhost:9092");
-        mockBootstrapServers.add("localhost:19092");
-        when(consumerConfig.getList("bootstrap.servers")).thenReturn(mockBootstrapServers);
+        String mockBootstrapServers = "localhost:9092,localhost:19092";
+        when(consumerConfig.get("bootstrap.servers")).thenReturn(mockBootstrapServers);
 
-        constructorInterceptor = new ConsumerConstructorInterceptor();
+        constructorInterceptor = new ConstructorWithMapInterceptPoint();
     }
 
     @Test
@@ -73,4 +69,4 @@ public class ConsumerConstructorInterceptorTest {
         assertThat(consumerEnhanceRequiredInfo.getBrokerServers(), is("localhost:9092;localhost:19092"));
     }
 
-}
\ No newline at end of file
+}
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md
index c8d957c..0fa090c 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -58,7 +58,7 @@ metrics based on the tracing data.
   * [Apache CXF](https://github.com/apache/cxf) 3.x
 * MQ
   * [RocketMQ](https://github.com/apache/rocketmq) 4.x
-  * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 2.6.1
+  * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 2.8.0
   * [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka))
   * [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
   * [RabbitMQ](https://www.rabbitmq.com/) 5.x
diff --git a/test/plugin/scenarios/kafka-scenario/support-version.list b/test/plugin/scenarios/kafka-scenario/support-version.list
index fdfa2cb..51b8ffd 100644
--- a/test/plugin/scenarios/kafka-scenario/support-version.list
+++ b/test/plugin/scenarios/kafka-scenario/support-version.list
@@ -23,3 +23,5 @@
 2.3.0
 2.5.1
 2.6.1
+2.7.0
+2.8.0