You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/04/29 02:22:50 UTC
[skywalking] branch master updated: Fix #6835 Add compatibility
with Kafka client version 2.8.x (#6837)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new e9fc6bc Fix #6835 Add compatibility with Kafka client version 2.8.x (#6837)
e9fc6bc is described below
commit e9fc6bc639dba4c0dc517d9d65474c1aa4b39598
Author: divyakumarjain <di...@users.noreply.github.com>
AuthorDate: Thu Apr 29 07:52:30 2021 +0530
Fix #6835 Add compatibility with Kafka client version 2.8.x (#6837)
---
CHANGES.md | 1 +
...java => AbstractConstructorInterceptPoint.java} | 17 ++-----
...nstructorWithConsumerConfigInterceptPoint.java} | 22 ++++-----
.../kafka/ConstructorWithMapInterceptPoint.java | 53 ++++++++++++++++++++++
.../kafka/define/KafkaConsumerInstrumentation.java | 20 ++++++--
...uctorWithConsumerConfigInterceptPointTest.java} | 14 +++---
...a => ConstructorWithMapInterceptPointTest.java} | 20 ++++----
.../service-agent/java-agent/Supported-list.md | 2 +-
.../scenarios/kafka-scenario/support-version.list | 2 +
9 files changed, 104 insertions(+), 47 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ddd2224..b4d8f7e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -16,6 +16,7 @@ Release Notes.
* resolve agent has no retries if connect kafka cluster failed when bootstrap
* Add Seata in the component definition. Seata plugin hosts on Seata project.
* Extended Kafka plugin to properly trace consumers that have topic partitions directly assigned.
+* Support Kafka consumer 2.8.0.
* Support print SkyWalking context to logs.
* Add `MessageListener` enhancement in pulsar plugin.
* fix a bug that spring-mvc set an error endpoint name if the controller class annotation implements an interface.
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/AbstractConstructorInterceptPoint.java
similarity index 65%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/AbstractConstructorInterceptPoint.java
index d0fa9e8..fed0494 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/AbstractConstructorInterceptPoint.java
@@ -18,22 +18,15 @@
package org.apache.skywalking.apm.plugin.kafka;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-/**
- *
- **/
-public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
+public abstract class AbstractConstructorInterceptPoint<T> implements InstanceConstructorInterceptor {
- @Override
- public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
- ConsumerConfig config = (ConsumerConfig) allArguments[0];
- // set the bootstrap server address
- ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
- requiredInfo.setBrokerServers(config.getList("bootstrap.servers"));
- requiredInfo.setGroupId(config.getString("group.id"));
+ @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+ ConsumerEnhanceRequiredInfo requiredInfo = resolveConsumerEnhanceRequiredInfo((T) allArguments[0]);
objInst.setSkyWalkingDynamicField(requiredInfo);
}
+
+ protected abstract ConsumerEnhanceRequiredInfo resolveConsumerEnhanceRequiredInfo(T allArgument);
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithConsumerConfigInterceptPoint.java
similarity index 60%
rename from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithConsumerConfigInterceptPoint.java
index d0fa9e8..65c091c 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithConsumerConfigInterceptPoint.java
@@ -19,21 +19,19 @@
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-/**
- *
- **/
-public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
+public class ConstructorWithConsumerConfigInterceptPoint extends AbstractConstructorInterceptPoint<ConsumerConfig> {
@Override
- public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
- ConsumerConfig config = (ConsumerConfig) allArguments[0];
- // set the bootstrap server address
+ protected ConsumerEnhanceRequiredInfo resolveConsumerEnhanceRequiredInfo(ConsumerConfig configArgument) {
ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
- requiredInfo.setBrokerServers(config.getList("bootstrap.servers"));
- requiredInfo.setGroupId(config.getString("group.id"));
- objInst.setSkyWalkingDynamicField(requiredInfo);
+
+ if (configArgument != null) {
+ // set the bootstrap server address
+ requiredInfo.setBrokerServers(configArgument.getList("bootstrap.servers"));
+ requiredInfo.setGroupId(configArgument.getString("group.id"));
+ }
+
+ return requiredInfo;
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithMapInterceptPoint.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithMapInterceptPoint.java
new file mode 100644
index 0000000..a27c384
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithMapInterceptPoint.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ *
+ **/
+public class ConstructorWithMapInterceptPoint extends AbstractConstructorInterceptPoint<Map<String, ?>> {
+
+ protected ConsumerEnhanceRequiredInfo resolveConsumerEnhanceRequiredInfo(Map<String, ?> configArgument) {
+ ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
+
+ if (configArgument != null) {
+ // set the bootstrap server address
+ requiredInfo.setBrokerServers(convertToList(configArgument.get("bootstrap.servers")));
+ requiredInfo.setGroupId((String) configArgument.get("group.id"));
+ }
+
+ return requiredInfo;
+ }
+
+ private List<String> convertToList(Object value) {
+ if (value instanceof List)
+ return (List<String>) value;
+ else if (value instanceof String) {
+ return Arrays.stream(((String) value).split(",")).collect(Collectors.toList());
+ }
+
+ return Collections.emptyList();
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
index b2d1066..507e367 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
@@ -41,7 +41,9 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName
public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerConfig";
- public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConsumerConstructorInterceptor";
+ public static final String CONSTRUCTOR_INTERCEPT_MAP_TYPE = "java.util.Map";
+ public static final String CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithConsumerConfigInterceptPoint";
+ public static final String MAP_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithMapInterceptPoint";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
public static final String ENHANCE_METHOD = "pollOnce";
public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
@@ -65,9 +67,21 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
@Override
public String getConstructorInterceptor() {
- return CONSTRUCTOR_INTERCEPTOR_CLASS;
+ return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
}
- }
+ },
+ new ConstructorInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription> getConstructorMatcher() {
+ return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_MAP_TYPE);
+ }
+
+ @Override
+ public String getConstructorInterceptor() {
+ return MAP_CONSTRUCTOR_INTERCEPTOR_CLASS;
+ }
+ },
+
};
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithConsumerConfigInterceptPointTest.java
similarity index 83%
copy from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
copy to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithConsumerConfigInterceptPointTest.java
index d6c2b43..5ac604b 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithConsumerConfigInterceptPointTest.java
@@ -34,15 +34,15 @@ import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class ConsumerConstructorInterceptorTest {
+public class ConstructorWithConsumerConfigInterceptPointTest {
@Mock
- private ConsumerConfig consumerConfig;
+ private ConsumerConfig consumerConfig;
@Mock
- private ConsumerConstructorInterceptor constructorInterceptor;
+ private ConstructorWithConsumerConfigInterceptPoint constructorInterceptor;
- private EnhancedInstance enhancedInstance = new EnhancedInstance() {
+ private EnhancedInstance enhancedInstance = new EnhancedInstance() {
private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
@Override
@@ -58,12 +58,12 @@ public class ConsumerConstructorInterceptorTest {
@Before
public void setUp() {
- List<String> mockBootstrapServers = new ArrayList<String>();
+ List<String> mockBootstrapServers = new ArrayList<>();
mockBootstrapServers.add("localhost:9092");
mockBootstrapServers.add("localhost:19092");
when(consumerConfig.getList("bootstrap.servers")).thenReturn(mockBootstrapServers);
- constructorInterceptor = new ConsumerConstructorInterceptor();
+ constructorInterceptor = new ConstructorWithConsumerConfigInterceptPoint();
}
@Test
@@ -73,4 +73,4 @@ public class ConsumerConstructorInterceptorTest {
assertThat(consumerEnhanceRequiredInfo.getBrokerServers(), is("localhost:9092;localhost:19092"));
}
-}
\ No newline at end of file
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithMapInterceptPointTest.java
similarity index 78%
rename from apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithMapInterceptPointTest.java
index d6c2b43..2b3c4b8 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConstructorWithMapInterceptPointTest.java
@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.plugin.kafka;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.junit.Before;
import org.junit.Test;
@@ -26,21 +25,20 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class ConsumerConstructorInterceptorTest {
+public class ConstructorWithMapInterceptPointTest {
@Mock
- private ConsumerConfig consumerConfig;
+ private Map<String, String> consumerConfig;
@Mock
- private ConsumerConstructorInterceptor constructorInterceptor;
+ private ConstructorWithMapInterceptPoint constructorInterceptor;
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
@@ -58,12 +56,10 @@ public class ConsumerConstructorInterceptorTest {
@Before
public void setUp() {
- List<String> mockBootstrapServers = new ArrayList<String>();
- mockBootstrapServers.add("localhost:9092");
- mockBootstrapServers.add("localhost:19092");
- when(consumerConfig.getList("bootstrap.servers")).thenReturn(mockBootstrapServers);
+ String mockBootstrapServers = "localhost:9092,localhost:19092";
+ when(consumerConfig.get("bootstrap.servers")).thenReturn(mockBootstrapServers);
- constructorInterceptor = new ConsumerConstructorInterceptor();
+ constructorInterceptor = new ConstructorWithMapInterceptPoint();
}
@Test
@@ -73,4 +69,4 @@ public class ConsumerConstructorInterceptorTest {
assertThat(consumerEnhanceRequiredInfo.getBrokerServers(), is("localhost:9092;localhost:19092"));
}
-}
\ No newline at end of file
+}
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md
index c8d957c..0fa090c 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -58,7 +58,7 @@ metrics based on the tracing data.
* [Apache CXF](https://github.com/apache/cxf) 3.x
* MQ
* [RocketMQ](https://github.com/apache/rocketmq) 4.x
- * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 2.6.1
+ * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 2.8.0
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka))
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
* [RabbitMQ](https://www.rabbitmq.com/) 5.x
diff --git a/test/plugin/scenarios/kafka-scenario/support-version.list b/test/plugin/scenarios/kafka-scenario/support-version.list
index fdfa2cb..51b8ffd 100644
--- a/test/plugin/scenarios/kafka-scenario/support-version.list
+++ b/test/plugin/scenarios/kafka-scenario/support-version.list
@@ -23,3 +23,5 @@
2.3.0
2.5.1
2.6.1
+2.7.0
+2.8.0