You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/11/03 06:41:51 UTC

[skywalking-java] branch main updated: Support mannual propagation of tracing context to next stream for webflux. (#371)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git


The following commit(s) were added to refs/heads/main by this push:
     new 64a33b24a4 Support mannual propagation of tracing context to next stream for webflux. (#371)
64a33b24a4 is described below

commit 64a33b24a45fb01affb419b4614e612c8b304961
Author: wuwen <wu...@aliyun.com>
AuthorDate: Thu Nov 3 14:41:45 2022 +0800

    Support mannual propagation of tracing context to next stream for webflux. (#371)
---
 CHANGES.md                                         |   3 +
 .../{ => apm-toolkit-webflux}/pom.xml              |  35 +++---
 .../webflux/WebFluxSkyWalkingOperators.java        | 138 +++++++++++++++++++++
 apm-application-toolkit/pom.xml                    |   1 +
 .../apm-toolkit-webflux-activation}/pom.xml        |  31 ++---
 .../WebFluxSkyWalkingOperatorsActivation.java      |  76 ++++++++++++
 .../WebFluxSkyWalkingOperatorsInterceptor.java     |  80 ++++++++++++
 .../src/main/resources/skywalking-plugin.def       |  17 +++
 apm-sniffer/apm-toolkit-activation/pom.xml         |   1 +
 .../java-agent/Application-toolkit-kafka.md        |  55 ++++++++
 .../java-agent/Application-toolkit-webflux.md      |  52 ++++++++
 .../setup/service-agent/java-agent/Plugin-list.md  |   1 +
 docs/menu.yml                                      |   4 +
 .../webflux-scenario/config/expectedData.yaml      |  93 +++++++++++++-
 .../projectA/controller/TestController.java        |   1 +
 .../webflux-projectB-scenario/pom.xml              |   5 +
 .../webflux/WebFluxSkyWalkingOperators.java        | 138 +++++++++++++++++++++
 .../controller/TestAnnotationController.java       |  21 ++++
 .../sc/webflux/projectB/utils/HttpUtils.java       |  44 +++++++
 19 files changed, 764 insertions(+), 32 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 6cb3f36630..bd4bd5daba 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -24,6 +24,7 @@ Release Notes.
 * Polish up activemq plugin to fix missing broker tag on consumer side
 * Enhance MQ plugin relative tests to check key tags not blank.
 * Add RocketMQ test scenarios for version 4.3 - 4.9. No 4.0 - 4.2 release images for testing.
+* Support mannual propagation of tracing context to next operators for webflux.
 * Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span. 
 
 #### Documentation
@@ -32,6 +33,8 @@ Release Notes.
 * Update plugin dev tags for cache relative tags.
 * Add plugin dev docs for virtual database tags.
 * Add plugin dev docs for virtual MQ tags.
+* Add doc about kafka plugin Manual APIs.
+
 
 All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/150?closed=1)
 
diff --git a/apm-application-toolkit/pom.xml b/apm-application-toolkit/apm-toolkit-webflux/pom.xml
similarity index 64%
copy from apm-application-toolkit/pom.xml
copy to apm-application-toolkit/apm-toolkit-webflux/pom.xml
index 3733377c47..e2a320c9a8 100644
--- a/apm-application-toolkit/pom.xml
+++ b/apm-application-toolkit/apm-toolkit-webflux/pom.xml
@@ -1,3 +1,4 @@
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
   ~ Licensed to the Apache Software Foundation (ASF) under one or more
   ~ contributor license agreements.  See the NOTICE file distributed with
@@ -15,25 +16,27 @@
   ~ limitations under the License.
   ~
   -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>java-agent</artifactId>
+        <artifactId>apm-application-toolkit</artifactId>
         <groupId>org.apache.skywalking</groupId>
         <version>8.13.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>apm-application-toolkit</artifactId>
-    <packaging>pom</packaging>
 
-    <modules>
-        <module>apm-toolkit-log4j-1.x</module>
-        <module>apm-toolkit-log4j-2.x</module>
-        <module>apm-toolkit-logback-1.x</module>
-        <module>apm-toolkit-opentracing</module>
-        <module>apm-toolkit-trace</module>
-        <module>apm-toolkit-meter</module>
-        <module>apm-toolkit-micrometer-registry</module>
-        <module>apm-toolkit-kafka</module>
-    </modules>
-</project>
+    <artifactId>apm-toolkit-webflux</artifactId>
+
+    <properties>
+        <spring-webflux.version>5.1.0.RELEASE</spring-webflux.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-webflux</artifactId>
+            <version>${spring-webflux.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/apm-application-toolkit/apm-toolkit-webflux/src/main/java/org/apache/skywalking/apm/toolkit/webflux/WebFluxSkyWalkingOperators.java b/apm-application-toolkit/apm-toolkit-webflux/src/main/java/org/apache/skywalking/apm/toolkit/webflux/WebFluxSkyWalkingOperators.java
new file mode 100644
index 0000000000..70ebacc108
--- /dev/null
+++ b/apm-application-toolkit/apm-toolkit-webflux/src/main/java/org/apache/skywalking/apm/toolkit/webflux/WebFluxSkyWalkingOperators.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.webflux;
+
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Signal;
+import reactor.core.publisher.SignalType;
+import reactor.util.context.Context;
+
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+
+/**
+ * WebFlux operators that are capable to reuse tracing context from Reactor's Context.
+ */
+public final class WebFluxSkyWalkingOperators {
+
+    private WebFluxSkyWalkingOperators() {
+        throw new IllegalStateException("You can't instantiate a utility class");
+    }
+
+    /**
+     * Wraps a runnable with a local span and continue tracing context.
+     *
+     * @param signalType - Reactor's signal type
+     * @param runnable   - lambda to execute within the tracing context
+     * @return consumer of a signal
+     */
+    public static Consumer<Signal<?>> continueTracing(SignalType signalType, Runnable runnable) {
+        return signal -> {
+            if (signalType != signal.getType()) {
+                return;
+            }
+            continueTracing(runnable).accept(signal);
+        };
+    }
+
+    /**
+     * Wraps a consumer with a local span and continue tracing context.
+     *
+     * @param signalType - Reactor's signal type
+     * @param consumer   - lambda to execute within the tracing context
+     * @return consumer of a signal
+     */
+    public static Consumer<Signal> continueTracing(SignalType signalType, Consumer<Signal> consumer) {
+        return signal -> {
+            if (signalType != signal.getType()) {
+                return;
+            }
+            continueTracing(signal.getContext(), () -> consumer.accept(signal));
+        };
+    }
+
+    /**
+     * Wraps a runnable with a local span and continue tracing context.
+     *
+     * @param runnable - lambda to execute within the tracing context
+     * @return consumer of a signal
+     */
+    public static Consumer<Signal> continueTracing(Runnable runnable) {
+        return signal -> {
+            Context context = signal.getContext();
+            continueTracing(context, runnable);
+        };
+    }
+
+    /**
+     * Wraps a runnable with a local span and continue tracing context.
+     *
+     * @param context  - Reactor context that contains the tracing context
+     * @param runnable - lambda to execute within the tracing context
+     */
+    public static void continueTracing(Context context, Runnable runnable) {
+        runnable.run();
+    }
+
+    /**
+     * Wraps a callable with a local span and continue tracing context.
+     *
+     * @param context  - Reactor context that contains the tracing context
+     * @param callable - lambda to execute within the tracing context
+     * @param <T>      callable's return type
+     * @return value from the callable
+     */
+    public static <T> T continueTracing(Context context, Callable<T> callable) {
+        try {
+            return callable.call();
+        } catch (Exception e) {
+            return sneakyThrow(e);
+        } 
+    }
+
+    /**
+     * Wraps a callable with a local span and continue tracing context.
+     *
+     * @param serverWebExchange  - EnhancedInstance that contains the tracing context
+     * @param callable - lambda to execute within the tracing context
+     * @param <T>      callable's return type
+     * @return value from the callable
+     */
+    public static <T> T continueTracing(ServerWebExchange serverWebExchange, Callable<T> callable) {
+        try {
+            return callable.call();
+        } catch (Exception e) {
+            return sneakyThrow(e);
+        }
+    }
+
+    /**
+     * Wraps a runnable with a local span and continue tracing context.
+     *
+     * @param serverWebExchange  - EnhancedInstance that contains the tracing context
+     * @param runnable - lambda to execute within the tracing context
+     */
+    public static void continueTracing(ServerWebExchange serverWebExchange, Runnable runnable) {
+        runnable.run();
+    }
+
+    private static <T extends Throwable, R> R sneakyThrow(Throwable t) throws T {
+        throw (T) t;
+    }
+}
diff --git a/apm-application-toolkit/pom.xml b/apm-application-toolkit/pom.xml
index 3733377c47..4444989719 100644
--- a/apm-application-toolkit/pom.xml
+++ b/apm-application-toolkit/pom.xml
@@ -35,5 +35,6 @@
         <module>apm-toolkit-meter</module>
         <module>apm-toolkit-micrometer-registry</module>
         <module>apm-toolkit-kafka</module>
+        <module>apm-toolkit-webflux</module>
     </modules>
 </project>
diff --git a/apm-application-toolkit/pom.xml b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/pom.xml
similarity index 64%
copy from apm-application-toolkit/pom.xml
copy to apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/pom.xml
index 3733377c47..c9d473b86e 100644
--- a/apm-application-toolkit/pom.xml
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/pom.xml
@@ -1,3 +1,4 @@
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
   ~ Licensed to the Apache Software Foundation (ASF) under one or more
   ~ contributor license agreements.  See the NOTICE file distributed with
@@ -16,24 +17,26 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>java-agent</artifactId>
+        <artifactId>apm-toolkit-activation</artifactId>
         <groupId>org.apache.skywalking</groupId>
         <version>8.13.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>apm-application-toolkit</artifactId>
-    <packaging>pom</packaging>
 
-    <modules>
-        <module>apm-toolkit-log4j-1.x</module>
-        <module>apm-toolkit-log4j-2.x</module>
-        <module>apm-toolkit-logback-1.x</module>
-        <module>apm-toolkit-opentracing</module>
-        <module>apm-toolkit-trace</module>
-        <module>apm-toolkit-meter</module>
-        <module>apm-toolkit-micrometer-registry</module>
-        <module>apm-toolkit-kafka</module>
-    </modules>
+    <artifactId>apm-toolkit-webflux-activation</artifactId>
+
+    <properties>
+        <spring-webflux.version>5.1.0.RELEASE</spring-webflux.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-webflux</artifactId>
+            <version>${spring-webflux.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/webflux/WebFluxSkyWalkingOperatorsActivation.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/webflux/WebFluxSkyWalkingOperatorsActivation.java
new file mode 100644
index 0000000000..09d9b81b0a
--- /dev/null
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/webflux/WebFluxSkyWalkingOperatorsActivation.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.activation.webflux;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassStaticMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ */
+public class WebFluxSkyWalkingOperatorsActivation extends ClassStaticMethodsEnhancePluginDefine {
+
+    public static final String INTERCEPT_CLASS =
+            "org.apache.skywalking.apm.toolkit.activation.webflux.WebFluxSkyWalkingOperatorsInterceptor";
+    public static final String ENHANCE_CLASS =
+            "org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators";
+    public static final String ENHANCE_METHOD = "continueTracing";
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return null;
+    }
+
+    @Override
+    public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
+        return new StaticMethodsInterceptPoint[] {
+                new StaticMethodsInterceptPoint() {
+                    @Override
+                    public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                        return named(ENHANCE_METHOD).and(takesArguments(2))
+                                .and(takesArgument(0, named("reactor.util.context.Context"))
+                                        .or(takesArgument(0, named("org.springframework.web.server.ServerWebExchange"))));
+                    }
+
+                    @Override
+                    public String getMethodsInterceptor() {
+                        return INTERCEPT_CLASS;
+                    }
+
+                    @Override
+                    public boolean isOverrideArgs() {
+                        return false;
+                    }
+                }
+        };
+    }
+}
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/webflux/WebFluxSkyWalkingOperatorsInterceptor.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/webflux/WebFluxSkyWalkingOperatorsInterceptor.java
new file mode 100644
index 0000000000..679710700f
--- /dev/null
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/webflux/WebFluxSkyWalkingOperatorsInterceptor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.activation.webflux;
+
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.server.ServerWebExchangeDecorator;
+import org.springframework.web.server.adapter.DefaultServerWebExchange;
+import reactor.util.context.Context;
+
+import java.lang.reflect.Method;
+
+/**
+ */
+public class WebFluxSkyWalkingOperatorsInterceptor implements StaticMethodsAroundInterceptor {
+    
+    @Override
+    public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+                             MethodInterceptResult result) {
+        // get ContextSnapshot from reactor context,  the snapshot is set to reactor context by any other plugin
+        // such as DispatcherHandlerHandleMethodInterceptor in spring-webflux-5.x-plugin
+        if (parameterTypes[0] == Context.class) {
+            ((Context) allArguments[0]).getOrEmpty("SKYWALKING_CONTEXT_SNAPSHOT")
+                    .ifPresent(ctx -> {
+                        ContextManager.createLocalSpan("WebFluxOperators/onNext").setComponent(ComponentsDefine.SPRING_WEBFLUX);
+                        ContextManager.continued((ContextSnapshot) ctx);
+                    });
+        } else if (parameterTypes[0] == ServerWebExchange.class) {
+            EnhancedInstance instance = getInstance(allArguments[0]);
+            if (instance != null && instance.getSkyWalkingDynamicField() != null) {
+                ContextManager.createLocalSpan("WebFluxOperators/onNext").setComponent(ComponentsDefine.SPRING_WEBFLUX);
+                ContextManager.continued((ContextSnapshot) instance.getSkyWalkingDynamicField());
+            }
+        }
+    }
+
+    @Override
+    public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, Object ret) {
+        ContextManager.stopSpan();
+        return ret;
+    }
+
+    @Override
+    public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+                                      Throwable t) {
+        ContextManager.activeSpan().log(t);
+    }
+
+    private static EnhancedInstance getInstance(Object o) {
+        EnhancedInstance instance = null;
+        if (o instanceof DefaultServerWebExchange && o instanceof EnhancedInstance) {
+            instance = (EnhancedInstance) o;
+        } else if (o instanceof ServerWebExchangeDecorator) {
+            ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();
+            return getInstance(delegate);
+        }
+        return instance;
+    }
+}
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/resources/skywalking-plugin.def
new file mode 100644
index 0000000000..de46b3ce11
--- /dev/null
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-webflux-activation/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+toolkit-webflux=org.apache.skywalking.apm.toolkit.activation.webflux.WebFluxSkyWalkingOperatorsActivation
\ No newline at end of file
diff --git a/apm-sniffer/apm-toolkit-activation/pom.xml b/apm-sniffer/apm-toolkit-activation/pom.xml
index b299999d23..1d7ca145ee 100644
--- a/apm-sniffer/apm-toolkit-activation/pom.xml
+++ b/apm-sniffer/apm-toolkit-activation/pom.xml
@@ -33,6 +33,7 @@
         <module>apm-toolkit-opentracing-activation</module>
         <module>apm-toolkit-trace-activation</module>
         <module>apm-toolkit-meter-activation</module>
+        <module>apm-toolkit-webflux-activation</module>
         <module>apm-toolkit-logging-common</module>
     </modules>
 
diff --git a/docs/en/setup/service-agent/java-agent/Application-toolkit-kafka.md b/docs/en/setup/service-agent/java-agent/Application-toolkit-kafka.md
new file mode 100644
index 0000000000..890fac223d
--- /dev/null
+++ b/docs/en/setup/service-agent/java-agent/Application-toolkit-kafka.md
@@ -0,0 +1,55 @@
+# Kafka Poll And Invoke
+* Dependency the toolkit, such as using maven or gradle
+
+```xml
+   <dependency>
+      <groupId>org.apache.skywalking</groupId>
+      <artifactId>apm-toolkit-kafka</artifactId>
+      <version>${skywalking.version}</version>
+   </dependency>
+```
+
+* usage 1.
+```java
+   public class ConsumerThread2 extends Thread {
+    @Override
+    public void run() {
+        Properties consumerProperties = new Properties();
+        //...consumerProperties.put()
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
+        consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener());
+        while (true) {
+            if (pollAndInvoke(consumer)) break;
+        }
+        consumer.close();
+    }
+
+    @KafkaPollAndInvoke
+    private boolean pollAndInvoke(KafkaConsumer<String, String> consumer) {
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+
+        ConsumerRecords<String, String> records = consumer.poll(100);
+
+        if (!records.isEmpty()) {
+            OkHttpClient client = new OkHttpClient.Builder().build();
+            Request request = new Request.Builder().url("http://localhost:8080/kafka-scenario/case/kafka-thread2-ping").build();
+            Response response = null;
+            try {
+                response = client.newCall(request).execute();
+            } catch (IOException e) {
+            }
+            response.body().close();
+            return true;
+        }
+        return false;
+    }
+}
+```
+
+_Sample codes only_
+
+
+
diff --git a/docs/en/setup/service-agent/java-agent/Application-toolkit-webflux.md b/docs/en/setup/service-agent/java-agent/Application-toolkit-webflux.md
new file mode 100644
index 0000000000..32a2662384
--- /dev/null
+++ b/docs/en/setup/service-agent/java-agent/Application-toolkit-webflux.md
@@ -0,0 +1,52 @@
+# Mannual propagation of tracing context for Webflux
+* Dependency the toolkit, such as using maven or gradle
+```xml
+   <dependency>
+      <groupId>org.apache.skywalking</groupId>
+      <artifactId>apm-toolkit-webflux</artifactId>
+      <version>${skywalking.version}</version>
+   </dependency>
+```
+
+* usage 1.
+```java
+    @GetMapping("/testcase/annotation/mono/onnext") 
+    public Mono<String> monoOnNext(@RequestBody(required = false) String body) {
+        return Mono.subscriberContext()
+            .flatMap(ctx -> WebFluxSkyWalkingOperators.continueTracing(ctx, () -> {
+                visit("http://localhost:" + serverPort + "/testcase/success");
+                return Mono.just("Hello World");
+            }));
+    }
+```
+* usage 2.
+```java
+    @GetMapping("/login/userFunctions")
+    public Mono<Response<FunctionInfoResult>> functionInfo(ServerWebExchange exchange, @RequestParam String userId) {
+        return ReactiveSecurityContextHolder.getContext()
+            .flatMap(context ->  {
+                return exchange.getSession().map(session -> WebFluxSkyWalkingOperators.continueTracing(exchange, () -> handle(session, userId)));
+            });
+    }
+
+    private Response<FunctionInfoResult> handle(WebSession session, String userId) {
+        //...dubbo rpc    
+    }
+```
+
+* usage 3.
+```java
+    Mono.just("key").subscribeOn(Schedulers.boundedElastic())
+        .doOnEach(WebFluxSkyWalkingOperators.continueTracing(SignalType.ON_NEXT, () -> log.info("test log with tid")))
+        .flatMap(key -> Mono.deferContextual(ctx -> WebFluxSkyWalkingOperators.continueTracing(Context.of(ctx), () -> {
+                redis.hasKey(key);
+                return Mono.just("SUCCESS");
+            })
+        ));
+...
+```
+
+_Sample codes only_
+
+
+
diff --git a/docs/en/setup/service-agent/java-agent/Plugin-list.md b/docs/en/setup/service-agent/java-agent/Plugin-list.md
index cf387807e9..1390befff3 100644
--- a/docs/en/setup/service-agent/java-agent/Plugin-list.md
+++ b/docs/en/setup/service-agent/java-agent/Plugin-list.md
@@ -122,6 +122,7 @@
 - toolkit-tag
 - toolkit-trace
 - toolkit-exception
+- toolkit-webflux
 - undertow-2.x-plugin
 - vertx-core-3.x
 - vertx-core-4.x
diff --git a/docs/menu.yml b/docs/menu.yml
index 9a56155bbd..13ca93a3b5 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -46,6 +46,10 @@ catalog:
             path: "/en/setup/service-agent/java-agent/application-toolkit-trace-cross-thread"
           - name: "MicroMeter Registry"
             path: "/en/setup/service-agent/java-agent/application-toolkit-micrometer"
+          - name: "Webflux Tracing Assistant APIs"
+            path: "/en/setup/service-agent/java-agent/application-toolkit-webflux"
+          - name: "Kafka Tracing Assistant APIs"
+            path: "/en/setup/service-agent/java-agent/application-toolkit-kafka"  
       - name: "Tolerate Custom Exceptions"
         path: "/en/setup/service-agent/java-agent/how-to-tolerate-exceptions"
       - name: "Log & Trace Correlation"
diff --git a/test/plugin/scenarios/webflux-scenario/config/expectedData.yaml b/test/plugin/scenarios/webflux-scenario/config/expectedData.yaml
index abea127316..3dc9ce8d17 100644
--- a/test/plugin/scenarios/webflux-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/webflux-scenario/config/expectedData.yaml
@@ -184,7 +184,81 @@ segmentItems:
       - {parentEndpoint: GET:/projectA/testcase, networkAddress: 'localhost:18080', refType: CrossProcess,
         parentSpanId: 8, parentTraceSegmentId: not null, parentServiceInstance: not
           null, parentService: not null, traceId: not null}
+      skipAnalysis: 'false'  
+  - segmentId: not null
+    spans:
+    - operationName: /testcase/success
+      parentSpanId: -1
+      spanId: 0
+      spanLayer: Http
+      startTime: not null
+      endTime: not null
+      componentId: 67
+      isError: false
+      spanType: Entry
+      peer: ''
+      skipAnalysis: 'false'
+      tags:
+      - {key: url, value: 'http://localhost:18080/testcase/success'}
+      - {key: http.method, value: GET}
+      - {key: http.status_code, value: '200'}    
+      refs:
+      - {parentEndpoint: WebFluxOperators/onNext, networkAddress: 'localhost:18080', refType: CrossProcess,
+        parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not
+          null, parentService: not null, traceId: not null}
+  - segmentId: not null
+    spans:
+    - operationName: /testcase/success
+      parentSpanId: 0
+      spanId: 1
+      spanLayer: Http
+      startTime: not null
+      endTime: not null
+      componentId: 2
+      isError: false
+      spanType: Exit
+      peer: not null
       skipAnalysis: 'false'
+      tags:
+      - {key: url, value: 'http://localhost:18080/testcase/success'}
+      - {key: http.method, value: GET}
+      - {key: http.status_code, value: '200'}
+    - operationName: WebFluxOperators/onNext
+      parentSpanId: -1
+      spanId: 0
+      spanLayer: Unknown
+      startTime: not null
+      endTime: not null
+      componentId: 67
+      isError: false
+      spanType: Local
+      peer: ''
+      refs:
+      - {parentEndpoint: /testcase/annotation/mono/onnext, networkAddress: '', refType: CrossThread,
+        parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
+          null, parentService: not null, traceId: not null}
+      skipAnalysis: 'false'       
+  - segmentId: not null
+    spans:
+    - operationName: /testcase/annotation/mono/onnext
+      parentSpanId: -1
+      spanId: 0
+      spanLayer: Http
+      startTime: not null
+      endTime: not null
+      componentId: 67
+      isError: false
+      spanType: Entry
+      peer: ''
+      tags:
+      - {key: url, value: 'http://localhost:18080/testcase/annotation/mono/onnext'}
+      - {key: http.method, value: GET}
+      - {key: http.status_code, value: '200'}
+      refs:
+      - {parentEndpoint: 'GET:/projectA/testcase', networkAddress: 'localhost:18080', refType: CrossProcess,
+        parentSpanId: 9, parentTraceSegmentId: not null, parentServiceInstance: not
+          null, parentService: not null, traceId: not null}
+      skipAnalysis: 'false'     
   - segmentId: not null
     spans:
     - operationName: /testcase/webclient/server
@@ -203,7 +277,7 @@ segmentItems:
       - {key: http.status_code, value: '200'}
       refs:
       - {parentEndpoint: GET:/projectA/testcase, networkAddress: 'localhost:18080', refType: CrossProcess,
-        parentSpanId: 9, parentTraceSegmentId: not null, parentServiceInstance: not
+        parentSpanId: 10, parentTraceSegmentId: not null, parentServiceInstance: not
           null, parentService: not null, traceId: not null}
       skipAnalysis: 'false'
 - serviceName: webflux-projectA-scenario
@@ -331,12 +405,27 @@ segmentItems:
       - {key: http.method, value: GET}
       - {key: http.status_code, value: '200'}
       skipAnalysis: 'false'
-    - operationName: /testcase/webclient/server
+    - operationName: /testcase/annotation/mono/onnext
       parentSpanId: 0
       spanId: 9
       spanLayer: Http
       startTime: not null
       endTime: not null
+      componentId: 2
+      isError: false
+      spanType: Exit
+      peer: not null
+      tags:
+      - {key: url, value: not null}
+      - {key: http.method, value: GET}
+      - {key: http.status_code, value: '200'}
+      skipAnalysis: 'false'  
+    - operationName: /testcase/webclient/server
+      parentSpanId: 0
+      spanId: 10
+      spanLayer: Http
+      startTime: not null
+      endTime: not null
       componentId: 99
       isError: false
       spanType: Exit
diff --git a/test/plugin/scenarios/webflux-scenario/webflux-projectA-scenario/src/main/java/org/apache/skywalking/apm/testcase/sc/webflux/projectA/controller/TestController.java b/test/plugin/scenarios/webflux-scenario/webflux-projectA-scenario/src/main/java/org/apache/skywalking/apm/testcase/sc/webflux/projectA/controller/TestController.java
index b11f74373b..45eca4b9a7 100644
--- a/test/plugin/scenarios/webflux-scenario/webflux-projectA-scenario/src/main/java/org/apache/skywalking/apm/testcase/sc/webflux/projectA/controller/TestController.java
+++ b/test/plugin/scenarios/webflux-scenario/webflux-projectA-scenario/src/main/java/org/apache/skywalking/apm/testcase/sc/webflux/projectA/controller/TestController.java
@@ -46,6 +46,7 @@ public class TestController {
         visit("http://" + hostBAddress + "/testcase/route/error");
         visit("http://" + hostBAddress + "/notFound");
         visit("http://" + hostBAddress + "/testcase/annotation/mono/hello");
+        visit("http://" + hostBAddress + "/testcase/annotation/mono/onnext");
         testGet("http://" + hostBAddress + "/testcase/webclient/server");
         return "test";
     }
diff --git a/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/pom.xml b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/pom.xml
index 792dc6df9a..79bc16984b 100644
--- a/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/pom.xml
+++ b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/pom.xml
@@ -34,6 +34,11 @@
             <artifactId>spring-boot-starter-webflux</artifactId>
             <version>${test.framework.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.6</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/org/apache/skywalking/apm/toolkit/webflux/WebFluxSkyWalkingOperators.java b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/org/apache/skywalking/apm/toolkit/webflux/WebFluxSkyWalkingOperators.java
new file mode 100644
index 0000000000..73b507bfbd
--- /dev/null
+++ b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/org/apache/skywalking/apm/toolkit/webflux/WebFluxSkyWalkingOperators.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.webflux;
+
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Signal;
+import reactor.core.publisher.SignalType;
+import reactor.util.context.Context;
+
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+
+/**
+ * WebFlux operators that are capable to reuse tracing context from Reactor's Context.
+ */
+public final class WebFluxSkyWalkingOperators {
+
+    private WebFluxSkyWalkingOperators() {
+        throw new IllegalStateException("You can't instantiate a utility class");
+    }
+
+    /**
+     * Wraps a runnable with a local span and continue tracing context.
+     *
+     * @param signalType - Reactor's signal type
+     * @param runnable   - lambda to execute within the tracing context
+     * @return consumer of a signal
+     */
+    public static Consumer<Signal<?>> continueTracing(SignalType signalType, Runnable runnable) {
+        return signal -> {
+            if (signalType != signal.getType()) {
+                return;
+            }
+            continueTracing(runnable).accept(signal);
+        };
+    }
+
+    /**
+     * Wraps a consumer with a local span and continue tracing context.
+     *
+     * @param signalType - Reactor's signal type
+     * @param consumer   - lambda to execute within the tracing context
+     * @return consumer of a signal
+     */
+    public static Consumer<Signal> continueTracing(SignalType signalType, Consumer<Signal> consumer) {
+        return signal -> {
+            if (signalType != signal.getType()) {
+                return;
+            }
+            continueTracing(signal.getContext(), () -> consumer.accept(signal));
+        };
+    }
+
+    /**
+     * Wraps a runnable with a local span and continue tracing context.
+     *
+     * @param runnable - lambda to execute within the tracing context
+     * @return consumer of a signal
+     */
+    public static Consumer<Signal> continueTracing(Runnable runnable) {
+        return signal -> {
+            Context context = signal.getContext();
+            continueTracing(context, runnable);
+        };
+    }
+
+    /**
+     * Wraps a runnable with a local span and continue tracing context.
+     *
+     * @param context  - Reactor context that contains the tracing context
+     * @param runnable - lambda to execute within the tracing context
+     */
+    public static void continueTracing(Context context, Runnable runnable) {
+        runnable.run();
+    }
+
+    /**
+     * Wraps a callable with a local span and continue tracing context.
+     *
+     * @param context  - Reactor context that contains the tracing context
+     * @param callable - lambda to execute within the tracing context
+     * @param <T>      callable's return type
+     * @return value from the callable
+     */
+    public static <T> T continueTracing(Context context, Callable<T> callable) {
+        try {
+            return callable.call();
+        } catch (Exception e) {
+            return sneakyThrow(e);
+        }
+    }
+
+    /**
+     * Wraps a callable with a local span and continue tracing context.
+     *
+     * @param serverWebExchange  - EnhancedInstance that contains the tracing context
+     * @param callable - lambda to execute within the tracing context
+     * @param <T>      callable's return type
+     * @return value from the callable
+     */
+    public static <T> T continueTracing(ServerWebExchange serverWebExchange, Callable<T> callable) {
+        try {
+            return callable.call();
+        } catch (Exception e) {
+            return sneakyThrow(e);
+        }
+    }
+
+    /**
+     * Wraps a runnable with a local span and continue tracing context.
+     *
+     * @param serverWebExchange  - EnhancedInstance that contains the tracing context
+     * @param runnable - lambda to execute within the tracing context
+     */
+    public static void continueTracing(ServerWebExchange serverWebExchange, Runnable runnable) {
+        runnable.run();
+    }
+
+    private static <T extends Throwable, R> R sneakyThrow(Throwable t) throws T {
+        throw (T) t;
+    }
+}
diff --git a/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/controller/TestAnnotationController.java b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/controller/TestAnnotationController.java
index b391f40bed..f487ec5f28 100644
--- a/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/controller/TestAnnotationController.java
+++ b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/controller/TestAnnotationController.java
@@ -17,6 +17,8 @@
 
 package test.apache.skywalking.apm.testcase.sc.webflux.projectB.controller;
 
+import org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -24,9 +26,14 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 import reactor.core.publisher.Mono;
 
+import static test.apache.skywalking.apm.testcase.sc.webflux.projectB.utils.HttpUtils.visit;
+
 @RestController
 public class TestAnnotationController {
 
+    @Value("${server.port:18080}")
+    private String serverPort;
+    
     @RequestMapping("/testcase/annotation/healthCheck")
     public String healthCheck() {
         return "healthCheck";
@@ -59,4 +66,18 @@ public class TestAnnotationController {
     public Mono<String> hello(@RequestBody(required = false) String body) {
         return Mono.just("Hello World");
     }
+
+    @RequestMapping("/testcase/success")
+    public String downstreamMock() {
+        return "1";
+    }
+
+    @GetMapping("/testcase/annotation/mono/onnext")
+    public Mono<String> monoOnNext(@RequestBody(required = false) String body) {
+        return Mono.subscriberContext()
+                .flatMap(ctx -> WebFluxSkyWalkingOperators.continueTracing(ctx, () -> {
+                    visit("http://localhost:" + serverPort + "/testcase/success");
+                    return Mono.just("Hello World");
+                }));
+    }
 }
diff --git a/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/utils/HttpUtils.java b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/utils/HttpUtils.java
new file mode 100644
index 0000000000..e79afc034a
--- /dev/null
+++ b/test/plugin/scenarios/webflux-scenario/webflux-projectB-scenario/src/main/java/test/apache/skywalking/apm/testcase/sc/webflux/projectB/utils/HttpUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package test.apache.skywalking.apm.testcase.sc.webflux.projectB.utils;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+
+public class HttpUtils {
+
+    public static String visit(String url) throws IOException {
+        CloseableHttpClient httpclient = HttpClients.createDefault();
+        try {
+            HttpGet httpget = new HttpGet(url);
+            ResponseHandler<String> responseHandler = response -> {
+                HttpEntity entity = response.getEntity();
+                return entity != null ? EntityUtils.toString(entity) : null;
+            };
+            return httpclient.execute(httpget, responseHandler);
+        } finally {
+            httpclient.close();
+        }
+    }
+}