You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/06/11 09:55:20 UTC
[skywalking] branch master updated: fix Kafka consumer subscribe
topics from pattern (#4873)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new e2b04b5 fix Kafka consumer subscribe topics from pattern (#4873)
e2b04b5 is described below
commit e2b04b58b1fc324109cf3e26d355ad7a33e59ce9
Author: denis <am...@qq.com>
AuthorDate: Thu Jun 11 17:55:04 2020 +0800
fix Kafka consumer subscribe topics from pattern (#4873)
---
.../plugin/kafka/SubscribeMethodInterceptor.java | 14 +++++--
.../kafka/define/KafkaConsumerInstrumentation.java | 23 ++++++++++-
.../kafka/SubscribeMethodInterceptorTest.java | 14 ++++++-
.../kafka-scenario/config/expectedData.yaml | 21 ++++++++++
.../testcase/kafka/controller/CaseController.java | 48 ++++++++++++++++++++++
5 files changed, 113 insertions(+), 7 deletions(-)
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
index ab944bd..039509c 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
@@ -20,24 +20,32 @@ package org.apache.skywalking.apm.plugin.kafka;
import java.lang.reflect.Method;
import java.util.Collection;
+import java.util.Collections;
+import java.util.regex.Pattern;
+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
public class SubscribeMethodInterceptor implements InstanceMethodsAroundInterceptor {
+ @SuppressWarnings("unchecked")
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
- MethodInterceptResult result) throws Throwable {
+ MethodInterceptResult result) {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
- requiredInfo.setTopics((Collection<String>) allArguments[0]);
+ if (argumentsTypes[0] == Pattern.class) {
+ requiredInfo.setTopics(Collections.singletonList(((Pattern) allArguments[0]).pattern()));
+ } else {
+ requiredInfo.setTopics((Collection<String>) allArguments[0]);
+ }
objInst.setSkyWalkingDynamicField(requiredInfo);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
- Object ret) throws Throwable {
+ Object ret) {
return ret;
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
index 4d9e8ce..5517351 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
@@ -47,7 +47,8 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
public static final String SUBSCRIBE_METHOD = "subscribe";
- public static final String SUBSCRIBE_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener";
+ public static final String SUBSCRIBE_INTERCEPT_TYPE_PATTERN = "java.util.regex.Pattern";
+ public static final String SUBSCRIBE_INTERCEPT_TYPE_NAME = "java.util.Collection";
public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.SubscribeMethodInterceptor";
@Override
@@ -89,7 +90,25 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
- return named(SUBSCRIBE_METHOD).and(takesArgumentWithType(1, SUBSCRIBE_INTERCEPT_TYPE));
+ return named(SUBSCRIBE_METHOD)
+ .and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_NAME));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return SUBSCRIBE_INTERCEPT_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ },
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription> getMethodsMatcher() {
+ return named(SUBSCRIBE_METHOD)
+ .and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_PATTERN));
}
@Override
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java
index da7c952..b221a4e 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java
@@ -28,6 +28,7 @@ import org.mockito.runners.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.regex.Pattern;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -38,7 +39,9 @@ public class SubscribeMethodInterceptorTest {
@Mock
private SubscribeMethodInterceptor constructorInterceptor;
- private List<String> mockTopics = new ArrayList<String>();
+ private List<String> mockTopics = new ArrayList<>();
+
+ private Pattern mockTopicPattern = Pattern.compile("test-.*");
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo();
@@ -62,9 +65,16 @@ public class SubscribeMethodInterceptorTest {
}
@Test
- public void testOnConsumer() throws Throwable {
+ public void testOnConsumer() {
constructorInterceptor.beforeMethod(enhancedInstance, null, new Object[] {mockTopics}, new Class[] {Collection.class}, null);
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
assertThat(requiredInfo.getTopics(), is("test;test-1"));
}
+
+ @Test
+ public void testSubscribeForPattern() {
+ constructorInterceptor.beforeMethod(enhancedInstance, null, new Object[] {mockTopicPattern}, new Class[] {Pattern.class}, null);
+ ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
+ assertThat(requiredInfo.getTopics(), is("test-.*"));
+ }
}
\ No newline at end of file
diff --git a/test/plugin/scenarios/kafka-scenario/config/expectedData.yaml b/test/plugin/scenarios/kafka-scenario/config/expectedData.yaml
index 2e5da2a..1dee9ea 100644
--- a/test/plugin/scenarios/kafka-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/kafka-scenario/config/expectedData.yaml
@@ -125,3 +125,24 @@ segmentItems:
parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: 'kafka-scenario', traceId: not null}
skipAnalysis: 'false'
+ - segmentId: not null
+ spans:
+ - operationName: Kafka/test./Consumer/testGroup2
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 41
+ isError: false
+ spanType: Entry
+ peer: ''
+ tags:
+ - {key: mq.broker, value: 'kafka-server:9092'}
+ - {key: mq.topic, value: test.}
+ refs:
+ - {parentEndpoint: /case/kafka-case, networkAddress: 'kafka-server:9092', refType: CrossProcess,
+ parentSpanId: 2, parentTraceSegmentId: not null, parentServiceInstance: not
+ null, parentService: 'kafka-scenario', traceId: not null}
+ skipAnalysis: 'false'
diff --git a/test/plugin/scenarios/kafka-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/kafka/controller/CaseController.java b/test/plugin/scenarios/kafka-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/kafka/controller/CaseController.java
index e787c01..1e53178 100644
--- a/test/plugin/scenarios/kafka-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/kafka/controller/CaseController.java
+++ b/test/plugin/scenarios/kafka-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/kafka/controller/CaseController.java
@@ -21,10 +21,12 @@ package test.org.apache.skywalking.apm.testcase.kafka.controller;
import java.util.Arrays;
import java.util.Properties;
import java.util.function.Consumer;
+import java.util.regex.Pattern;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -54,6 +56,7 @@ public class CaseController {
private String topicName;
private String topicName2;
+ private Pattern topicPattern;
private static volatile boolean KAFKA_STATUS = false;
@@ -61,6 +64,7 @@ public class CaseController {
private void setUp() {
topicName = "test";
topicName2 = "test2";
+ topicPattern = Pattern.compile("test.");
new CheckKafkaProducerThread(bootstrapServers).start();
}
@@ -84,10 +88,15 @@ public class CaseController {
};
producer.send(record2, callback2);
}, bootstrapServers);
+
Thread thread = new ConsumerThread();
thread.start();
+
+ Thread thread2 = new ConsumerThread2();
+ thread2.start();
try {
thread.join();
+ thread2.join();
} catch (InterruptedException e) {
// ignore
}
@@ -200,5 +209,44 @@ public class CaseController {
consumer.close();
}
}
+
+ public class ConsumerThread2 extends Thread {
+ @Override
+ public void run() {
+ Properties consumerProperties = new Properties();
+ consumerProperties.put("bootstrap.servers", bootstrapServers);
+ consumerProperties.put("group.id", "testGroup2");
+ consumerProperties.put("enable.auto.commit", "true");
+ consumerProperties.put("auto.commit.interval.ms", "1000");
+ consumerProperties.put("auto.offset.reset", "earliest");
+ consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
+ consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener());
+ int i = 0;
+ while (i++ <= 10) {
+ try {
+ Thread.sleep(1 * 1000);
+ } catch (InterruptedException e) {
+ }
+
+ ConsumerRecords<String, String> records = consumer.poll(100);
+
+ if (!records.isEmpty()) {
+ for (ConsumerRecord<String, String> record : records) {
+ logger.info("header: {}", new String(record.headers()
+ .headers("TEST")
+ .iterator()
+ .next()
+ .value()));
+ logger.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
+ }
+ break;
+ }
+ }
+
+ consumer.close();
+ }
+ }
}