You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/06/11 09:55:20 UTC

[skywalking] branch master updated: fix Kafka consumer subscribe topics from pattern (#4873)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new e2b04b5  fix Kafka consumer subscribe topics from pattern (#4873)
e2b04b5 is described below

commit e2b04b58b1fc324109cf3e26d355ad7a33e59ce9
Author: denis <am...@qq.com>
AuthorDate: Thu Jun 11 17:55:04 2020 +0800

    fix Kafka consumer subscribe topics from pattern (#4873)
---
 .../plugin/kafka/SubscribeMethodInterceptor.java   | 14 +++++--
 .../kafka/define/KafkaConsumerInstrumentation.java | 23 ++++++++++-
 .../kafka/SubscribeMethodInterceptorTest.java      | 14 ++++++-
 .../kafka-scenario/config/expectedData.yaml        | 21 ++++++++++
 .../testcase/kafka/controller/CaseController.java  | 48 ++++++++++++++++++++++
 5 files changed, 113 insertions(+), 7 deletions(-)

diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
index ab944bd..039509c 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java
@@ -20,24 +20,32 @@ package org.apache.skywalking.apm.plugin.kafka;
 
 import java.lang.reflect.Method;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.regex.Pattern;
+
 import org.apache.skywalking.apm.agent.core.context.ContextManager;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
 
 public class SubscribeMethodInterceptor implements InstanceMethodsAroundInterceptor {
+    @SuppressWarnings("unchecked")
     @Override
     public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
-        MethodInterceptResult result) throws Throwable {
+        MethodInterceptResult result) {
         ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
-        requiredInfo.setTopics((Collection<String>) allArguments[0]);
+        if (argumentsTypes[0] == Pattern.class) {
+            requiredInfo.setTopics(Collections.singletonList(((Pattern) allArguments[0]).pattern()));
+        } else {
+            requiredInfo.setTopics((Collection<String>) allArguments[0]);
+        }
 
         objInst.setSkyWalkingDynamicField(requiredInfo);
     }
 
     @Override
     public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
-        Object ret) throws Throwable {
+        Object ret) {
         return ret;
     }
 
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
index 4d9e8ce..5517351 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
@@ -47,7 +47,8 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
     public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
     public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
     public static final String SUBSCRIBE_METHOD = "subscribe";
-    public static final String SUBSCRIBE_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener";
+    public static final String SUBSCRIBE_INTERCEPT_TYPE_PATTERN = "java.util.regex.Pattern";
+    public static final String SUBSCRIBE_INTERCEPT_TYPE_NAME = "java.util.Collection";
     public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.SubscribeMethodInterceptor";
 
     @Override
@@ -89,7 +90,25 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
             new InstanceMethodsInterceptPoint() {
                 @Override
                 public ElementMatcher<MethodDescription> getMethodsMatcher() {
-                    return named(SUBSCRIBE_METHOD).and(takesArgumentWithType(1, SUBSCRIBE_INTERCEPT_TYPE));
+                    return named(SUBSCRIBE_METHOD)
+                      .and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_NAME));
+                }
+
+                @Override
+                public String getMethodsInterceptor() {
+                    return SUBSCRIBE_INTERCEPT_CLASS;
+                }
+
+                @Override
+                public boolean isOverrideArgs() {
+                    return false;
+                }
+            },
+            new InstanceMethodsInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(SUBSCRIBE_METHOD)
+                      .and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_PATTERN));
                 }
 
                 @Override
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java
index da7c952..b221a4e 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java
@@ -28,6 +28,7 @@ import org.mockito.runners.MockitoJUnitRunner;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.regex.Pattern;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -38,7 +39,9 @@ public class SubscribeMethodInterceptorTest {
     @Mock
     private SubscribeMethodInterceptor constructorInterceptor;
 
-    private List<String> mockTopics = new ArrayList<String>();
+    private List<String> mockTopics = new ArrayList<>();
+
+    private Pattern mockTopicPattern = Pattern.compile("test-.*");
 
     private EnhancedInstance enhancedInstance = new EnhancedInstance() {
         ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo();
@@ -62,9 +65,16 @@ public class SubscribeMethodInterceptorTest {
     }
 
     @Test
-    public void testOnConsumer() throws Throwable {
+    public void testOnConsumer() {
         constructorInterceptor.beforeMethod(enhancedInstance, null, new Object[] {mockTopics}, new Class[] {Collection.class}, null);
         ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
         assertThat(requiredInfo.getTopics(), is("test;test-1"));
     }
+
+    @Test
+    public void testSubscribeForPattern() {
+        constructorInterceptor.beforeMethod(enhancedInstance, null, new Object[] {mockTopicPattern}, new Class[] {Pattern.class}, null);
+        ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
+        assertThat(requiredInfo.getTopics(), is("test-.*"));
+    }
 }
\ No newline at end of file
diff --git a/test/plugin/scenarios/kafka-scenario/config/expectedData.yaml b/test/plugin/scenarios/kafka-scenario/config/expectedData.yaml
index 2e5da2a..1dee9ea 100644
--- a/test/plugin/scenarios/kafka-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/kafka-scenario/config/expectedData.yaml
@@ -125,3 +125,24 @@ segmentItems:
         parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not
           null, parentService: 'kafka-scenario', traceId: not null}
       skipAnalysis: 'false'
+  - segmentId: not null
+    spans:
+      - operationName: Kafka/test./Consumer/testGroup2
+        operationId: 0
+        parentSpanId: -1
+        spanId: 0
+        spanLayer: MQ
+        startTime: nq 0
+        endTime: nq 0
+        componentId: 41
+        isError: false
+        spanType: Entry
+        peer: ''
+        tags:
+          - {key: mq.broker, value: 'kafka-server:9092'}
+          - {key: mq.topic, value: test.}
+        refs:
+          - {parentEndpoint: /case/kafka-case, networkAddress: 'kafka-server:9092', refType: CrossProcess,
+             parentSpanId: 2, parentTraceSegmentId: not null, parentServiceInstance: not
+                                                                null, parentService: 'kafka-scenario', traceId: not null}
+        skipAnalysis: 'false'
diff --git a/test/plugin/scenarios/kafka-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/kafka/controller/CaseController.java b/test/plugin/scenarios/kafka-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/kafka/controller/CaseController.java
index e787c01..1e53178 100644
--- a/test/plugin/scenarios/kafka-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/kafka/controller/CaseController.java
+++ b/test/plugin/scenarios/kafka-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/kafka/controller/CaseController.java
@@ -21,10 +21,12 @@ package test.org.apache.skywalking.apm.testcase.kafka.controller;
 import java.util.Arrays;
 import java.util.Properties;
 import java.util.function.Consumer;
+import java.util.regex.Pattern;
 import javax.annotation.PostConstruct;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -54,6 +56,7 @@ public class CaseController {
 
     private String topicName;
     private String topicName2;
+    private Pattern topicPattern;
 
     private static volatile boolean KAFKA_STATUS = false;
 
@@ -61,6 +64,7 @@ public class CaseController {
     private void setUp() {
         topicName = "test";
         topicName2 = "test2";
+        topicPattern = Pattern.compile("test.");
         new CheckKafkaProducerThread(bootstrapServers).start();
     }
 
@@ -84,10 +88,15 @@ public class CaseController {
             };
             producer.send(record2, callback2);
         }, bootstrapServers);
+
         Thread thread = new ConsumerThread();
         thread.start();
+
+        Thread thread2 = new ConsumerThread2();
+        thread2.start();
         try {
             thread.join();
+            thread2.join();
         } catch (InterruptedException e) {
             // ignore
         }
@@ -200,5 +209,44 @@ public class CaseController {
             consumer.close();
         }
     }
+
+    public class ConsumerThread2 extends Thread {
+        @Override
+        public void run() {
+            Properties consumerProperties = new Properties();
+            consumerProperties.put("bootstrap.servers", bootstrapServers);
+            consumerProperties.put("group.id", "testGroup2");
+            consumerProperties.put("enable.auto.commit", "true");
+            consumerProperties.put("auto.commit.interval.ms", "1000");
+            consumerProperties.put("auto.offset.reset", "earliest");
+            consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+            consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
+            consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener());
+            int i = 0;
+            while (i++ <= 10) {
+                try {
+                    Thread.sleep(1 * 1000);
+                } catch (InterruptedException e) {
+                }
+
+                ConsumerRecords<String, String> records = consumer.poll(100);
+
+                if (!records.isEmpty()) {
+                    for (ConsumerRecord<String, String> record : records) {
+                        logger.info("header: {}", new String(record.headers()
+                          .headers("TEST")
+                          .iterator()
+                          .next()
+                          .value()));
+                        logger.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
+                    }
+                    break;
+                }
+            }
+
+            consumer.close();
+        }
+    }
 }