You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/11/03 06:41:51 UTC
[skywalking-java] branch main updated: Support mannual propagation of tracing context to next stream for webflux. (#371)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git
The following commit(s) were added to refs/heads/main by this push:
new 64a33b24a4 Support mannual propagation of tracing context to next stream for webflux. (#371)
64a33b24a4 is described below
commit 64a33b24a45fb01affb419b4614e612c8b304961
Author: wuwen <wu...@aliyun.com>
AuthorDate: Thu Nov 3 14:41:45 2022 +0800
Support mannual propagation of tracing context to next stream for webflux. (#371)
---
CHANGES.md | 3 +
.../{ => apm-toolkit-webflux}/pom.xml | 35 +++---
.../webflux/WebFluxSkyWalkingOperators.java | 138 +++++++++++++++++++++
apm-application-toolkit/pom.xml | 1 +
.../apm-toolkit-webflux-activation}/pom.xml | 31 ++---
.../WebFluxSkyWalkingOperatorsActivation.java | 76 ++++++++++++
.../WebFluxSkyWalkingOperatorsInterceptor.java | 80 ++++++++++++
.../src/main/resources/skywalking-plugin.def | 17 +++
apm-sniffer/apm-toolkit-activation/pom.xml | 1 +
.../java-agent/Application-toolkit-kafka.md | 55 ++++++++
.../java-agent/Application-toolkit-webflux.md | 52 ++++++++
.../setup/service-agent/java-agent/Plugin-list.md | 1 +
docs/menu.yml | 4 +
.../webflux-scenario/config/expectedData.yaml | 93 +++++++++++++-
.../projectA/controller/TestController.java | 1 +
.../webflux-projectB-scenario/pom.xml | 5 +
.../webflux/WebFluxSkyWalkingOperators.java | 138 +++++++++++++++++++++
.../controller/TestAnnotationController.java | 21 ++++
.../sc/webflux/projectB/utils/HttpUtils.java | 44 +++++++
19 files changed, 764 insertions(+), 32 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 6cb3f36630..bd4bd5daba 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -24,6 +24,7 @@ Release Notes.
* Polish up activemq plugin to fix missing broker tag on consumer side
* Enhance MQ plugin relative tests to check key tags not blank.
* Add RocketMQ test scenarios for version 4.3 - 4.9. No 4.0 - 4.2 release images for testing.
+* Support mannual propagation of tracing context to next operators for webflux.
* Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span.
#### Documentation
@@ -32,6 +33,8 @@ Release Notes.
* Update plugin dev tags for cache relative tags.
* Add plugin dev docs for virtual database tags.
* Add plugin dev docs for virtual MQ tags.
+* Add doc about kafka plugin Manual APIs.
+
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/150?closed=1)
diff --git a/apm-application-toolkit/pom.xml b/apm-application-toolkit/apm-toolkit-webflux/pom.xml
similarity index 64%
copy from apm-application-toolkit/pom.xml
copy to apm-application-toolkit/apm-toolkit-webflux/pom.xml
index 3733377c47..e2a320c9a8 100644
--- a/apm-application-toolkit/pom.xml
+++ b/apm-application-toolkit/apm-toolkit-webflux/pom.xml
@@ -1,3 +1,4 @@
+<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
@@ -15,25 +16,27 @@
~ limitations under the License.
~
-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>java-agent</artifactId>
+ <artifactId>apm-application-toolkit</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>apm-application-toolkit</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>apm-toolkit-log4j-1.x</module>
- <module>apm-toolkit-log4j-2.x</module>
- <module>apm-toolkit-logback-1.x</module>
- <module>apm-toolkit-opentracing</module>
- <module>apm-toolkit-trace</module>
- <module>apm-toolkit-meter</module>
- <module>apm-toolkit-micrometer-registry</module>
- <module>apm-toolkit-kafka</module>
- </modules>
-</project>
+ <artifactId>apm-toolkit-webflux</artifactId>
+
+ <properties>
+ <spring-webflux.version>5.1.0.RELEASE</spring-webflux.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-webflux</artifactId>
+ <version>${spring-webflux.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/apm-application-toolkit/apm-toolkit-webflux/src/main/java/org/apache/skywalking/apm/toolkit/webflux/WebFluxSkyWalkingOperators.java b/apm-application-toolkit/apm-toolkit-webflux/src/main/java/org/apache/skywalking/apm/toolkit/webflux/WebFluxSkyWalkingOperators.java
new file mode 100644
index 0000000000..70ebacc108
--- /dev/null
+++ b/apm-application-toolkit/apm-toolkit-webflux/src/main/java/org/apache/skywalking/apm/toolkit/webflux/WebFluxSkyWalkingOperators.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.webflux;
+
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Signal;
+import reactor.core.publisher.SignalType;
+import reactor.util.context.Context;
+
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+
+/**
+ * WebFlux operators that are capable to reuse tracing context from Reactor's Context.
+ */
+public final class WebFluxSkyWalkingOperators {
+
+ private WebFluxSkyWalkingOperators() {
+ throw new IllegalStateException("You can't instantiate a utility class");
+ }
+
+ /**
+ * Wraps a runnable with a local span and continue tracing context.
+ *
+ * @param signalType - Reactor's signal type
+ * @param runnable - lambda to execute within the tracing context
+ * @return consumer of a signal
+ */
+ public static Consumer<Signal<?>> continueTracing(SignalType signalType, Runnable runnable) {
+ return signal -> {
+ if (signalType != signal.getType()) {
+ return;
+ }
+ continueTracing(runnable).accept(signal);
+ };
+ }
+
+ /**
+ * Wraps a consumer with a local span and continue tracing context.
+ *
+ * @param signalType - Reactor's signal type
+ * @param consumer - lambda to execute within the tracing context
+ * @return consumer of a signal
+ */
+ public static Consumer<Signal> continueTracing(SignalType signalType, Consumer<Signal> consumer) {
+ return signal -> {
+ if (signalType != signal.getType()) {
+ return;
+ }
+ continueTracing(signal.getContext(), () -> consumer.accept(signal));
+ };
+ }
+
+ /**
+ * Wraps a runnable with a local span and continue tracing context.
+ *
+ * @param runnable - lambda to execute within the tracing context
+ * @return consumer of a signal
+ */
+ public static Consumer<Signal> continueTracing(Runnable runnable) {
+ return signal -> {
+ Context context = signal.getContext();
+ continueTracing(context, runnable);
+ };
+ }
+
+ /**
+ * Wraps a runnable with a local span and continue tracing context.
+ *
+ * @param context - Reactor context that contains the tracing context
+ * @param runnable - lambda to execute within the tracing context
+ */
+ public static void continueTracing(Context context, Runnable runnable) {
+ runnable.run();
+ }
+
+ /**
+ * Wraps a callable with a local span and continue tracing context.
+ *
+ * @param context - Reactor context that contains the tracing context
+ * @param callable - lambda to execute within the tracing context
+ * @param <T> callable's return type
+ * @return value from the callable
+ */
+ public static <T> T continueTracing(Context context, Callable<T> callable) {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ return sneakyThrow(e);
+ }
+ }
+
+ /**
+ * Wraps a callable with a local span and continue tracing context.
+ *
+ * @param serverWebExchange - EnhancedInstance that contains the tracing context
+ * @param callable - lambda to execute within the tracing context
+ * @param <T> callable's return type
+ * @return value from the callable
+ */
+ public static <T> T continueTracing(ServerWebExchange serverWebExchange, Callable<T> callable) {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ return sneakyThrow(e);
+ }
+ }
+
+ /**
+ * Wraps a runnable with a local span and continue tracing context.
+ *
+ * @param serverWebExchange - EnhancedInstance that contains the tracing context
+ * @param runnable - lambda to execute within the tracing context
+ */
+ public static void continueTracing(ServerWebExchange serverWebExchange, Runnable runnable) {
+ runnable.run();
+ }
+
+ private static <T extends Throwable, R> R sneakyThrow(Throwable t) throws T {
+ throw (T) t;
+ }
+}
diff --git a/apm-application-toolkit/pom.xml b/apm-application-toolkit/pom.xml
index 3733377c47..4444989719 100644
--- a/apm-application-toolkit/pom.xml
+++ b/apm-application-toolkit/pom.xml
@@ -35,5 +35,6 @@
<module>apm-toolkit-meter</module>
<module>apm-toolkit-micrometer-registry</module>
<module>apm-toolkit-kafka</module>
+ <module>apm-toolkit-webflux</module>
</modules>
</project>
diff --git a/apm-application-toolkit/pom.xml b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/pom.xml
similarity index 64%
copy from apm-application-toolkit/pom.xml
copy to apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/pom.xml
index 3733377c47..c9d473b86e 100644
--- a/apm-application-toolkit/pom.xml
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/pom.xml
@@ -1,3 +1,4 @@
+<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
@@ -16,24 +17,26 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>java-agent</artifactId>
+ <artifactId>apm-toolkit-activation</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>apm-application-toolkit</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>apm-toolkit-log4j-1.x</module>
- <module>apm-toolkit-log4j-2.x</module>
- <module>apm-toolkit-logback-1.x</module>
- <module>apm-toolkit-opentracing</module>
- <module>apm-toolkit-trace</module>
- <module>apm-toolkit-meter</module>
- <module>apm-toolkit-micrometer-registry</module>
- <module>apm-toolkit-kafka</module>
- </modules>
+ <artifactId>apm-toolkit-webflux-activation</artifactId>
+
+ <properties>
+ <spring-webflux.version>5.1.0.RELEASE</spring-webflux.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-webflux</artifactId>
+ <version>${spring-webflux.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
</project>
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/webflux/WebFluxSkyWalkingOperatorsActivation.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/webflux/WebFluxSkyWalkingOperatorsActivation.java
new file mode 100644
index 0000000000..09d9b81b0a
--- /dev/null
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/webflux/WebFluxSkyWalkingOperatorsActivation.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.activation.webflux;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassStaticMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ */
+public class WebFluxSkyWalkingOperatorsActivation extends ClassStaticMethodsEnhancePluginDefine {
+
+ public static final String INTERCEPT_CLASS =
+ "org.apache.skywalking.apm.toolkit.activation.webflux.WebFluxSkyWalkingOperatorsInterceptor";
+ public static final String ENHANCE_CLASS =
+ "org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators";
+ public static final String ENHANCE_METHOD = "continueTracing";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return null;
+ }
+
+ @Override
+ public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
+ return new StaticMethodsInterceptPoint[] {
+ new StaticMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription> getMethodsMatcher() {
+ return named(ENHANCE_METHOD).and(takesArguments(2))
+ .and(takesArgument(0, named("reactor.util.context.Context"))
+ .or(takesArgument(0, named("org.springframework.web.server.ServerWebExchange"))));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return INTERCEPT_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/webflux/WebFluxSkyWalkingOperatorsInterceptor.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/webflux/WebFluxSkyWalkingOperatorsInterceptor.java
new file mode 100644
index 0000000000..679710700f
--- /dev/null
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/webflux/WebFluxSkyWalkingOperatorsInterceptor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.activation.webflux;
+
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.server.ServerWebExchangeDecorator;
+import org.springframework.web.server.adapter.DefaultServerWebExchange;
+import reactor.util.context.Context;
+
+import java.lang.reflect.Method;
+
+/**
+ */
+public class WebFluxSkyWalkingOperatorsInterceptor implements StaticMethodsAroundInterceptor {
+
+ @Override
+ public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+ MethodInterceptResult result) {
+ // get ContextSnapshot from reactor context, the snapshot is set to reactor context by any other plugin
+ // such as DispatcherHandlerHandleMethodInterceptor in spring-webflux-5.x-plugin
+ if (parameterTypes[0] == Context.class) {
+ ((Context) allArguments[0]).getOrEmpty("SKYWALKING_CONTEXT_SNAPSHOT")
+ .ifPresent(ctx -> {
+ ContextManager.createLocalSpan("WebFluxOperators/onNext").setComponent(ComponentsDefine.SPRING_WEBFLUX);
+ ContextManager.continued((ContextSnapshot) ctx);
+ });
+ } else if (parameterTypes[0] == ServerWebExchange.class) {
+ EnhancedInstance instance = getInstance(allArguments[0]);
+ if (instance != null && instance.getSkyWalkingDynamicField() != null) {
+ ContextManager.createLocalSpan("WebFluxOperators/onNext").setComponent(ComponentsDefine.SPRING_WEBFLUX);
+ ContextManager.continued((ContextSnapshot) instance.getSkyWalkingDynamicField());
+ }
+ }
+ }
+
+ @Override
+ public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, Object ret) {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+ Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+
+ private static EnhancedInstance getInstance(Object o) {
+ EnhancedInstance instance = null;
+ if (o instanceof DefaultServerWebExchange && o instanceof EnhancedInstance) {
+ instance = (EnhancedInstance) o;
+ } else if (o instanceof ServerWebExchangeDecorator) {
+ ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();
+ return getInstance(delegate);
+ }
+ return instance;
+ }
+}
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/resources/skywalking-plugin.def
new file mode 100644
index 0000000000..de46b3ce11
--- /dev/null
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+toolkit-webflux=org.apache.skywalking.apm.toolkit.activation.webflux.WebFluxSkyWalkingOperatorsActivation
\ No newline at end of file
diff --git a/apm-sniffer/apm-toolkit-activation/pom.xml b/apm-sniffer/apm-toolkit-activation/pom.xml
index b299999d23..1d7ca145ee 100644
--- a/apm-sniffer/apm-toolkit-activation/pom.xml
+++ b/apm-sniffer/apm-toolkit-activation/pom.xml
@@ -33,6 +33,7 @@
<module>apm-toolkit-opentracing-activation</module>
<module>apm-toolkit-trace-activation</module>
<module>apm-toolkit-meter-activation</module>
+ <module>apm-toolkit-webflux-activation</module>
<module>apm-toolkit-logging-common</module>
</modules>
diff --git a/docs/en/setup/service-agent/java-agent/Application-toolkit-kafka.md b/docs/en/setup/service-agent/java-agent/Application-toolkit-kafka.md
new file mode 100644
index 0000000000..890fac223d
--- /dev/null
+++ b/docs/en/setup/service-agent/java-agent/Application-toolkit-kafka.md
@@ -0,0 +1,55 @@
+# Kafka Poll And Invoke
+* Dependency the toolkit, such as using maven or gradle
+
+```xml
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>apm-toolkit-kafka</artifactId>
+ <version>${skywalking.version}</version>
+ </dependency>
+```
+
+* usage 1.
+```java
+ public class ConsumerThread2 extends Thread {
+ @Override
+ public void run() {
+ Properties consumerProperties = new Properties();
+ //...consumerProperties.put()
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
+ consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener());
+ while (true) {
+ if (pollAndInvoke(consumer)) break;
+ }
+ consumer.close();
+ }
+
+ @KafkaPollAndInvoke
+ private boolean pollAndInvoke(KafkaConsumer<String, String> consumer) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+
+ ConsumerRecords<String, String> records = consumer.poll(100);
+
+ if (!records.isEmpty()) {
+ OkHttpClient client = new OkHttpClient.Builder().build();
+ Request request = new Request.Builder().url("http://localhost:8080/kafka-scenario/case/kafka-thread2-ping").build();
+ Response response = null;
+ try {
+ response = client.newCall(request).execute();
+ } catch (IOException e) {
+ }
+ response.body().close();
+ return true;
+ }
+ return false;
+ }
+}
+```
+
+_Sample codes only_
+
+
+
diff --git a/docs/en/setup/service-agent/java-agent/Application-toolkit-webflux.md b/docs/en/setup/service-agent/java-agent/Application-toolkit-webflux.md
new file mode 100644
index 0000000000..32a2662384
--- /dev/null
+++ b/docs/en/setup/service-agent/java-agent/Application-toolkit-webflux.md
@@ -0,0 +1,52 @@
+# Mannual propagation of tracing context for Webflux
+* Dependency the toolkit, such as using maven or gradle
+```xml
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>apm-toolkit-webflux</artifactId>
+ <version>${skywalking.version}</version>
+ </dependency>
+```
+
+* usage 1.
+```java
+ @GetMapping("/testcase/annotation/mono/onnext")
+ public Mono<String> monoOnNext(@RequestBody(required = false) String body) {
+ return Mono.subscriberContext()
+ .flatMap(ctx -> WebFluxSkyWalkingOperators.continueTracing(ctx, () -> {
+ visit("http://localhost:" + serverPort + "/testcase/success");
+ return Mono.just("Hello World");
+ }));
+ }
+```
+* usage 2.
+```java
+ @GetMapping("/login/userFunctions")
+ public Mono<Response<FunctionInfoResult>> functionInfo(ServerWebExchange exchange, @RequestParam String userId) {
+ return ReactiveSecurityContextHolder.getContext()
+ .flatMap(context -> {
+ return exchange.getSession().map(session -> WebFluxSkyWalkingOperators.continueTracing(exchange, () -> handle(session, userId)));
+ });
+ }
+
+ private Response<FunctionInfoResult> handle(WebSession session, String userId) {
+ //...dubbo rpc
+ }
+```
+
+* usage 3.
+```java
+ Mono.just("key").subscribeOn(Schedulers.boundedElastic())
+ .doOnEach(WebFluxSkyWalkingOperators.continueTracing(SignalType.ON_NEXT, () -> log.info("test log with tid")))
+ .flatMap(key -> Mono.deferContextual(ctx -> WebFluxSkyWalkingOperators.continueTracing(Context.of(ctx), () -> {
+ redis.hasKey(key);
+ return Mono.just("SUCCESS");
+ })
+ ));
+...
+```
+
+_Sample codes only_
+
+
+
diff --git a/docs/en/setup/service-agent/java-agent/Plugin-list.md b/docs/en/setup/service-agent/java-agent/Plugin-list.md
index cf387807e9..1390befff3 100644
--- a/docs/en/setup/service-agent/java-agent/Plugin-list.md
+++ b/docs/en/setup/service-agent/java-agent/Plugin-list.md
@@ -122,6 +122,7 @@
- toolkit-tag
- toolkit-trace
- toolkit-exception
+- toolkit-webflux
- undertow-2.x-plugin
- vertx-core-3.x
- vertx-core-4.x
diff --git a/docs/menu.yml b/docs/menu.yml
index 9a56155bbd..13ca93a3b5 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -46,6 +46,10 @@ catalog:
path: "/en/setup/service-agent/java-agent/application-toolkit-trace-cross-thread"
- name: "MicroMeter Registry"
path: "/en/setup/service-agent/java-agent/application-toolkit-micrometer"
+ - name: "Webflux Tracing Assistant APIs"
+ path: "/en/setup/service-agent/java-agent/application-toolkit-webflux"
+ - name: "Kafka Tracing Assistant APIs"
+ path: "/en/setup/service-agent/java-agent/application-toolkit-kafka"
- name: "Tolerate Custom Exceptions"
path: "/en/setup/service-agent/java-agent/how-to-tolerate-exceptions"
- name: "Log & Trace Correlation"
diff --git a/test/plugin/scenarios/webflux-scenario/config/expectedData.yaml b/test/plugin/scenarios/webflux-scenario/config/expectedData.yaml
index abea127316..3dc9ce8d17 100644
--- a/test/plugin/scenarios/webflux-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/webflux-scenario/config/expectedData.yaml
@@ -184,7 +184,81 @@ segmentItems:
- {parentEndpoint: GET:/projectA/testcase, networkAddress: 'localhost:18080', refType: CrossProcess,
parentSpanId: 8, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: not null, traceId: not null}
+ skipAnalysis: 'false'
+ - segmentId: not null
+ spans:
+ - operationName: /testcase/success
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ startTime: not null
+ endTime: not null
+ componentId: 67
+ isError: false
+ spanType: Entry
+ peer: ''
+ skipAnalysis: 'false'
+ tags:
+ - {key: url, value: 'http://localhost:18080/testcase/success'}
+ - {key: http.method, value: GET}
+ - {key: http.status_code, value: '200'}
+ refs:
+ - {parentEndpoint: WebFluxOperators/onNext, networkAddress: 'localhost:18080', refType: CrossProcess,
+ parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not
+ null, parentService: not null, traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: /testcase/success
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: Http
+ startTime: not null
+ endTime: not null
+ componentId: 2
+ isError: false
+ spanType: Exit
+ peer: not null
skipAnalysis: 'false'
+ tags:
+ - {key: url, value: 'http://localhost:18080/testcase/success'}
+ - {key: http.method, value: GET}
+ - {key: http.status_code, value: '200'}
+ - operationName: WebFluxOperators/onNext
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Unknown
+ startTime: not null
+ endTime: not null
+ componentId: 67
+ isError: false
+ spanType: Local
+ peer: ''
+ refs:
+ - {parentEndpoint: /testcase/annotation/mono/onnext, networkAddress: '', refType: CrossThread,
+ parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
+ null, parentService: not null, traceId: not null}
+ skipAnalysis: 'false'
+ - segmentId: not null
+ spans:
+ - operationName: /testcase/annotation/mono/onnext
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ startTime: not null
+ endTime: not null
+ componentId: 67
+ isError: false
+ spanType: Entry
+ peer: ''
+ tags:
+ - {key: url, value: 'http://localhost:18080/testcase/annotation/mono/onnext'}
+ - {key: http.method, value: GET}
+ - {key: http.status_code, value: '200'}
+ refs:
+ - {parentEndpoint: 'GET:/projectA/testcase', networkAddress: 'localhost:18080', refType: CrossProcess,
+ parentSpanId: 9, parentTraceSegmentId: not null, parentServiceInstance: not
+ null, parentService: not null, traceId: not null}
+ skipAnalysis: 'false'
- segmentId: not null
spans:
- operationName: /testcase/webclient/server
@@ -203,7 +277,7 @@ segmentItems:
- {key: http.status_code, value: '200'}
refs:
- {parentEndpoint: GET:/projectA/testcase, networkAddress: 'localhost:18080', refType: CrossProcess,
- parentSpanId: 9, parentTraceSegmentId: not null, parentServiceInstance: not
+ parentSpanId: 10, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: not null, traceId: not null}
skipAnalysis: 'false'
- serviceName: webflux-projectA-scenario
@@ -331,12 +405,27 @@ segmentItems:
- {key: http.method, value: GET}
- {key: http.status_code, value: '200'}
skipAnalysis: 'false'
- - operationName: /testcase/webclient/server
+ - operationName: /testcase/annotation/mono/onnext
parentSpanId: 0
spanId: 9
spanLayer: Http
startTime: not null
endTime: not null
+ componentId: 2
+ isError: false
+ spanType: Exit
+ peer: not null
+ tags:
+ - {key: url, value: not null}
+ - {key: http.method, value: GET}
+ - {key: http.status_code, value: '200'}
+ skipAnalysis: 'false'
+ - operationName: /testcase/webclient/server
+ parentSpanId: 0
+ spanId: 10
+ spanLayer: Http
+ startTime: not null
+ endTime: not null
componentId: 99
isError: false
spanType: Exit
diff --git a/test/plugin/scenarios/webflux-scenario/webflux-projectA-scenario/src/main/java/org/apache/skywalking/apm/testcase/sc/webflux/projectA/controller/TestController.java b/test/plugin/scenarios/webflux-scenario/webflux-projectA-scenario/src/main/java/org/apache/skywalking/apm/testcase/sc/webflux/projectA/controller/TestController.java
index b11f74373b..45eca4b9a7 100644
--- a/test/plugin/scenarios/webflux-scenario/webflux-projectA-scenario/src/main/java/org/apache/skywalking/apm/testcase/sc/webflux/projectA/controller/TestController.java
+++ b/test/plugin/scenarios/webflux-scenario/webflux-projectA-scenario/src/main/java/org/apache/skywalking/apm/testcase/sc/webflux/projectA/controller/TestController.java
@@ -46,6 +46,7 @@ public class TestController {
visit("http://" + hostBAddress + "/testcase/route/error");
visit("http://" + hostBAddress + "/notFound");
visit("http://" + hostBAddress + "/testcase/annotation/mono/hello");
+ visit("http://" + hostBAddress + "/testcase/annotation/mono/onnext");
testGet("http://" + hostBAddress + "/testcase/webclient/server");
return "test";
}
diff --git a/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/pom.xml b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/pom.xml
index 792dc6df9a..79bc16984b 100644
--- a/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/pom.xml
+++ b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/pom.xml
@@ -34,6 +34,11 @@
<artifactId>spring-boot-starter-webflux</artifactId>
<version>${test.framework.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.6</version>
+ </dependency>
</dependencies>
<build>
diff --git a/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/org/apache/skywalking/apm/toolkit/webflux/WebFluxSkyWalkingOperators.java b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/org/apache/skywalking/apm/toolkit/webflux/WebFluxSkyWalkingOperators.java
new file mode 100644
index 0000000000..73b507bfbd
--- /dev/null
+++ b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/org/apache/skywalking/apm/toolkit/webflux/WebFluxSkyWalkingOperators.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.webflux;
+
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Signal;
+import reactor.core.publisher.SignalType;
+import reactor.util.context.Context;
+
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+
+/**
+ * WebFlux operators that are capable to reuse tracing context from Reactor's Context.
+ */
+public final class WebFluxSkyWalkingOperators {
+
+ private WebFluxSkyWalkingOperators() {
+ throw new IllegalStateException("You can't instantiate a utility class");
+ }
+
+ /**
+ * Wraps a runnable with a local span and continue tracing context.
+ *
+ * @param signalType - Reactor's signal type
+ * @param runnable - lambda to execute within the tracing context
+ * @return consumer of a signal
+ */
+ public static Consumer<Signal<?>> continueTracing(SignalType signalType, Runnable runnable) {
+ return signal -> {
+ if (signalType != signal.getType()) {
+ return;
+ }
+ continueTracing(runnable).accept(signal);
+ };
+ }
+
+ /**
+ * Wraps a consumer with a local span and continue tracing context.
+ *
+ * @param signalType - Reactor's signal type
+ * @param consumer - lambda to execute within the tracing context
+ * @return consumer of a signal
+ */
+ public static Consumer<Signal> continueTracing(SignalType signalType, Consumer<Signal> consumer) {
+ return signal -> {
+ if (signalType != signal.getType()) {
+ return;
+ }
+ continueTracing(signal.getContext(), () -> consumer.accept(signal));
+ };
+ }
+
+ /**
+ * Wraps a runnable with a local span and continue tracing context.
+ *
+ * @param runnable - lambda to execute within the tracing context
+ * @return consumer of a signal
+ */
+ public static Consumer<Signal> continueTracing(Runnable runnable) {
+ return signal -> {
+ Context context = signal.getContext();
+ continueTracing(context, runnable);
+ };
+ }
+
+ /**
+ * Wraps a runnable with a local span and continue tracing context.
+ *
+ * @param context - Reactor context that contains the tracing context
+ * @param runnable - lambda to execute within the tracing context
+ */
+ public static void continueTracing(Context context, Runnable runnable) {
+ runnable.run();
+ }
+
+ /**
+ * Wraps a callable with a local span and continue tracing context.
+ *
+ * @param context - Reactor context that contains the tracing context
+ * @param callable - lambda to execute within the tracing context
+ * @param <T> callable's return type
+ * @return value from the callable
+ */
+ public static <T> T continueTracing(Context context, Callable<T> callable) {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ return sneakyThrow(e);
+ }
+ }
+
+ /**
+ * Wraps a callable with a local span and continue tracing context.
+ *
+ * @param serverWebExchange - EnhancedInstance that contains the tracing context
+ * @param callable - lambda to execute within the tracing context
+ * @param <T> callable's return type
+ * @return value from the callable
+ */
+ public static <T> T continueTracing(ServerWebExchange serverWebExchange, Callable<T> callable) {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ return sneakyThrow(e);
+ }
+ }
+
+ /**
+ * Wraps a runnable with a local span and continue tracing context.
+ *
+ * @param serverWebExchange - EnhancedInstance that contains the tracing context
+ * @param runnable - lambda to execute within the tracing context
+ */
+ public static void continueTracing(ServerWebExchange serverWebExchange, Runnable runnable) {
+ runnable.run();
+ }
+
+ private static <T extends Throwable, R> R sneakyThrow(Throwable t) throws T {
+ throw (T) t;
+ }
+}
diff --git a/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/controller/TestAnnotationController.java b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/controller/TestAnnotationController.java
index b391f40bed..f487ec5f28 100644
--- a/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/controller/TestAnnotationController.java
+++ b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/controller/TestAnnotationController.java
@@ -17,6 +17,8 @@
package test.apache.skywalking.apm.testcase.sc.webflux.projectB.controller;
+import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
@@ -24,9 +26,14 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
+import static test.apache.skywalking.apm.testcase.sc.webflux.projectB.utils.HttpUtils.visit;
+
@RestController
public class TestAnnotationController {
+ @Value("${server.port:18080}")
+ private String serverPort;
+
@RequestMapping("/testcase/annotation/healthCheck")
public String healthCheck() {
return "healthCheck";
@@ -59,4 +66,18 @@ public class TestAnnotationController {
public Mono<String> hello(@RequestBody(required = false) String body) {
return Mono.just("Hello World");
}
+
+ @RequestMapping("/testcase/success")
+ public String downstreamMock() {
+ return "1";
+ }
+
+ @GetMapping("/testcase/annotation/mono/onnext")
+ public Mono<String> monoOnNext(@RequestBody(required = false) String body) {
+ return Mono.subscriberContext()
+ .flatMap(ctx -> WebFluxSkyWalkingOperators.continueTracing(ctx, () -> {
+ visit("http://localhost:" + serverPort + "/testcase/success");
+ return Mono.just("Hello World");
+ }));
+ }
}
diff --git a/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/utils/HttpUtils.java b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/utils/HttpUtils.java
new file mode 100644
index 0000000000..e79afc034a
--- /dev/null
+++ b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/utils/HttpUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.apache.skywalking.apm.testcase.sc.webflux.projectB.utils;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+
+public class HttpUtils {
+
+ public static String visit(String url) throws IOException {
+ CloseableHttpClient httpclient = HttpClients.createDefault();
+ try {
+ HttpGet httpget = new HttpGet(url);
+ ResponseHandler<String> responseHandler = response -> {
+ HttpEntity entity = response.getEntity();
+ return entity != null ? EntityUtils.toString(entity) : null;
+ };
+ return httpclient.execute(httpget, responseHandler);
+ } finally {
+ httpclient.close();
+ }
+ }
+}