You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/11/01 13:37:04 UTC

[skywalking-java] branch main updated: [Enhance]For trace CompletableFuture thenApply or thenAccept (#60)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git


The following commit(s) were added to refs/heads/main by this push:
     new 1308d3c  [Enhance]For trace CompletableFuture thenApply or thenAccept (#60)
1308d3c is described below

commit 1308d3c0ae360cf4632f87363365f22c4c6fc89f
Author: will2020-power <63...@users.noreply.github.com>
AuthorDate: Mon Nov 1 21:36:34 2021 +0800

    [Enhance]For trace CompletableFuture thenApply or thenAccept (#60)
---
 CHANGES.md                                         |   1 +
 .../apm/toolkit/trace/ConsumerWrapper.java         |  40 ++++++++
 .../apm/toolkit/trace/FunctionWrapper.java         |  40 ++++++++
 .../trace/CallableOrRunnableActivation.java        |   6 +-
 .../Application-toolkit-trace-cross-thread.md      |  18 ++++
 .../config/expectedData.yaml                       | 107 ++++++++++++++++++++-
 .../apm/toolkit/trace/ConsumerWrapper.java         |  40 ++++++++
 .../apm/toolkit/trace/FunctionWrapper.java         |  40 ++++++++
 .../toolkit/controller/TestController.java         |  43 ++++++++-
 .../testcase/toolkit/controller/TestService.java   |  12 +++
 10 files changed, 344 insertions(+), 3 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index beded13..bc3cedb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,6 +6,7 @@ Release Notes.
 ------------------
 
 * Support `Transaction` and fix duplicated methods enhancements for `jedis-2.x` plugin.
+* Add ConsumerWrapper/FunctionWrapper to support CompletableFuture.x.thenAcceptAsync/thenApplyAsync.
 
 #### Documentation
 
diff --git a/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/ConsumerWrapper.java b/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/ConsumerWrapper.java
new file mode 100644
index 0000000..9d05ef1
--- /dev/null
+++ b/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/ConsumerWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+ 
+package org.apache.skywalking.apm.toolkit.trace;
+
+import java.util.function.Consumer;
+
+@TraceCrossThread
+public class ConsumerWrapper<V> implements Consumer<V> {
+    final Consumer<V> consumer;
+
+    public ConsumerWrapper(Consumer<V> consumer) {
+        this.consumer = consumer;
+    }
+
+    public static <V> ConsumerWrapper<V> of(Consumer<V> consumer) {
+        return new ConsumerWrapper(consumer);
+    }
+
+    @Override
+    public void accept(V v) {
+        this.consumer.accept(v);
+    }
+
+}
diff --git a/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/FunctionWrapper.java b/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/FunctionWrapper.java
new file mode 100644
index 0000000..33878c6
--- /dev/null
+++ b/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/FunctionWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.trace;
+
+import java.util.function.Function;
+
+@TraceCrossThread
+public class FunctionWrapper<T, R> implements Function<T, R> {
+    final Function<T, R> function;
+
+    public FunctionWrapper(Function<T, R> function) {
+        this.function = function;
+    }
+
+    public static <T, R> FunctionWrapper<T, R> of(Function<T, R> function) {
+        return new FunctionWrapper(function);
+    }
+
+    @Override
+    public R apply(T t) {
+        return this.function.apply(t);
+    }
+
+}
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CallableOrRunnableActivation.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CallableOrRunnableActivation.java
index 1584376..b3a6414 100644
--- a/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CallableOrRunnableActivation.java
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CallableOrRunnableActivation.java
@@ -41,6 +41,8 @@ public class CallableOrRunnableActivation extends ClassInstanceMethodsEnhancePlu
     private static final String CALL_METHOD_NAME = "call";
     private static final String RUN_METHOD_NAME = "run";
     private static final String GET_METHOD_NAME = "get";
+    private static final String APPLY_METHOD_NAME = "apply";
+    private static final String ACCEPT_METHOD_NAME = "accept";
 
     @Override
     public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
@@ -68,7 +70,9 @@ public class CallableOrRunnableActivation extends ClassInstanceMethodsEnhancePlu
                     return named(CALL_METHOD_NAME)
                         .and(takesArguments(0))
                         .or(named(RUN_METHOD_NAME).and(takesArguments(0)))
-                        .or(named(GET_METHOD_NAME).and(takesArguments(0)));
+                        .or(named(GET_METHOD_NAME).and(takesArguments(0)))
+                        .or(named(APPLY_METHOD_NAME).and(takesArguments(1)))
+                        .or(named(ACCEPT_METHOD_NAME).and(takesArguments(1)));
                 }
 
                 @Override
diff --git a/docs/en/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.md b/docs/en/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.md
index cfd171b..a65424a 100644
--- a/docs/en/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.md
+++ b/docs/en/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.md
@@ -57,6 +57,24 @@ or
             return "SupplierWrapper";
     })).thenAccept(System.out::println);
 ```
+* usage 4.
+```java
+    CompletableFuture.supplyAsync(SupplierWrapper.of(() -> {
+        return "SupplierWrapper";
+    })).thenAcceptAsync(ConsumerWrapper.of(c -> {
+        // your code visit(url)
+        System.out.println("ConsumerWrapper");
+    }));
+```
+or 
+```java
+    CompletableFuture.supplyAsync(SupplierWrapper.of(() -> {
+        return "SupplierWrapper";
+    })).thenApplyAsync(FunctionWrapper.of(f -> {
+        // your code visit(url)
+        return "FunctionWrapper";
+    }));
+```
 _Sample codes only_
 
 
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/config/expectedData.yaml b/test/plugin/scenarios/apm-toolkit-trace-scenario/config/expectedData.yaml
index 2ae6fb0..71ae848 100644
--- a/test/plugin/scenarios/apm-toolkit-trace-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/config/expectedData.yaml
@@ -334,7 +334,112 @@ segmentItems:
         parentTraceSegmentId: not null, parentServiceInstance: not null, parentService: apm-toolkit-trace-scenario,
         traceId: not null}
       skipAnalysis: 'true'
-
+  - segmentId: not null
+    spans:
+    - operationName: /apm-toolkit-trace-scenario/case/asyncVisit/consumer
+      parentSpanId: 0
+      spanId: 1
+      spanLayer: Http
+      startTime: nq 0
+      endTime: nq 0
+      componentId: 2
+      isError: false
+      spanType: Exit
+      peer: localhost:8080
+      tags:
+      - {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/consumer'}
+      - {key: http.method, value: GET}
+      skipAnalysis: 'true'
+    - operationName: Thread/org.apache.skywalking.apm.toolkit.trace.ConsumerWrapper/accept
+      parentSpanId: -1
+      spanId: 0
+      spanLayer: Unknown
+      startTime: nq 0
+      endTime: nq 0
+      componentId: 0
+      isError: false
+      spanType: Local
+      peer: ''
+      refs:
+      - {parentEndpoint: GET:/case/tool-kit, networkAddress: '', refType: CrossThread,
+        parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
+          null, parentService: apm-toolkit-trace-scenario, traceId: not null}
+      skipAnalysis: 'true'
+  - segmentId: not null
+    spans:
+    - operationName: GET:/case/asyncVisit/function
+      parentSpanId: -1
+      spanId: 0
+      spanLayer: Http
+      startTime: nq 0
+      endTime: nq 0
+      componentId: 14
+      isError: false
+      spanType: Entry
+      peer: ''
+      tags:
+      - {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/function'}
+      - {key: http.method, value: GET}
+      - {key: correlation, value: correlationValueTest}
+      refs:
+      - {parentEndpoint: Thread/org.apache.skywalking.apm.toolkit.trace.FunctionWrapper/apply,
+        networkAddress: 'localhost:8080', refType: CrossProcess, parentSpanId: 1,
+        parentTraceSegmentId: not null, parentServiceInstance: not null, parentService: apm-toolkit-trace-scenario,
+        traceId: not null}
+      skipAnalysis: 'true'
+  - segmentId: not null
+    spans:
+    - operationName: /apm-toolkit-trace-scenario/case/asyncVisit/function
+      parentSpanId: 0
+      spanId: 1
+      spanLayer: Http
+      startTime: nq 0
+      endTime: nq 0
+      componentId: 2
+      isError: false
+      spanType: Exit
+      peer: localhost:8080
+      tags:
+      - {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/function'}
+      - {key: http.method, value: GET}
+      skipAnalysis: 'true'
+    - operationName: Thread/org.apache.skywalking.apm.toolkit.trace.FunctionWrapper/apply
+      parentSpanId: -1
+      spanId: 0
+      spanLayer: Unknown
+      startTime: nq 0
+      endTime: nq 0
+      componentId: 0
+      isError: false
+      spanType: Local
+      peer: ''
+      refs:
+      - {parentEndpoint: GET:/case/tool-kit, networkAddress: '', refType: CrossThread,
+        parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
+          null, parentService: apm-toolkit-trace-scenario, traceId: not null}
+      skipAnalysis: 'true'
+  - segmentId: not null
+    spans:
+    - operationName: GET:/case/asyncVisit/consumer
+      parentSpanId: -1
+      spanId: 0
+      spanLayer: Http
+      startTime: nq 0
+      endTime: nq 0
+      componentId: 14
+      isError: false
+      spanType: Entry
+      peer: ''
+      tags:
+      - {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/consumer'}
+      - {key: http.method, value: GET}
+      - {key: correlation, value: correlationValueTest}
+      refs:
+      - {parentEndpoint: Thread/org.apache.skywalking.apm.toolkit.trace.ConsumerWrapper/accept,
+        networkAddress: 'localhost:8080', refType: CrossProcess, parentSpanId: 1,
+        parentTraceSegmentId: not null, parentServiceInstance: not null, parentService: apm-toolkit-trace-scenario,
+        traceId: not null}
+      skipAnalysis: 'true'
 meterItems:
 - serviceName: apm-toolkit-trace-scenario
   meterSize: 3
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/ConsumerWrapper.java b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/ConsumerWrapper.java
new file mode 100644
index 0000000..02ca819
--- /dev/null
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/ConsumerWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+ 
+package org.apache.skywalking.apm.toolkit.trace;
+
+import java.util.function.Consumer;
+
+@TraceCrossThread
+public class ConsumerWrapper<V> implements Consumer<V> {
+    final Consumer<V> consumer;
+
+    public ConsumerWrapper(Consumer<V> consumer) {
+        this.consumer = consumer;
+    }
+
+    public static <V> ConsumerWrapper<V> of(Consumer<V> consumer) {
+        return new ConsumerWrapper(consumer);
+    }
+
+    @Override
+    public void accept(V v) {
+        this.consumer.accept(v);
+    }
+
+}
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/FunctionWrapper.java b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/FunctionWrapper.java
new file mode 100644
index 0000000..7cd780c
--- /dev/null
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/FunctionWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.trace;
+
+import java.util.function.Function;
+
+@TraceCrossThread
+public class FunctionWrapper<T, R> implements Function<T, R> {
+    final Function<T, R> function;
+
+    public FunctionWrapper(Function<T, R> function) {
+        this.function = function;
+    }
+
+    public static <T, R> FunctionWrapper<T, R> of(Function<T, R> function) {
+        return new FunctionWrapper(function);
+    }
+
+    @Override
+    public R apply(T t) {
+        return this.function.apply(t);
+    }
+
+}
\ No newline at end of file
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestController.java b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestController.java
index 8f3d221..9cff2f2 100644
--- a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestController.java
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestController.java
@@ -84,12 +84,41 @@ public class TestController {
             }
             return true;
         });
+        testService.asyncSupplierThenAccept(() -> {
+            try {
+                visit("http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/supplier");
+            } catch (IOException e) {
+                // ignore
+            }
+            return true;
+        }, c -> {
+            try {
+                visit("http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/consumer");
+            } catch (IOException e) {
+                // ignore
+            }
+        });
+        testService.asyncSupplierThenApply(() -> {
+            try {
+                visit("http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/supplier");
+            } catch (IOException e) {
+                // ignore
+            }
+            return true;
+        }, f -> {
+            try {
+                visit("http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/function");
+            } catch (IOException e) {
+                // ignore
+            }
+            return true;
+        });
 
         // meter
         MeterFactory.counter("test_counter").tag("ck1", "cv1").build().increment(2d);
         MeterFactory.gauge("test_gauge", () -> 1d).tag("gk1", "gv1").build();
         MeterFactory.histogram("test_histogram").tag("hk1", "hv1").steps(Arrays.asList(1d, 5d, 10d))
-                    .build().addValue(4d);
+                .build().addValue(4d);
         return SUCCESS;
     }
 
@@ -116,6 +145,18 @@ public class TestController {
         return SUCCESS;
     }
 
+    @RequestMapping("/asyncVisit/consumer")
+    public String asyncVisitConsumer() {
+        ActiveSpan.tag(CORRELATION_CONTEXT_TAG_KEY, TraceContext.getCorrelation(CORRELATION_CONTEXT_KEY).orElse(""));
+        return SUCCESS;
+    }
+
+    @RequestMapping("/asyncVisit/function")
+    public String asyncVisitFunction() {
+        ActiveSpan.tag(CORRELATION_CONTEXT_TAG_KEY, TraceContext.getCorrelation(CORRELATION_CONTEXT_KEY).orElse(""));
+        return SUCCESS;
+    }
+
     private static void visit(String url) throws IOException {
         CloseableHttpClient httpclient = HttpClients.createDefault();
         try {
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestService.java b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestService.java
index ac21326..ded85f6 100644
--- a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestService.java
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestService.java
@@ -23,11 +23,15 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Supplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.skywalking.apm.toolkit.model.User;
 import org.apache.skywalking.apm.toolkit.trace.ActiveSpan;
 import org.apache.skywalking.apm.toolkit.trace.CallableWrapper;
 import org.apache.skywalking.apm.toolkit.trace.RunnableWrapper;
 import org.apache.skywalking.apm.toolkit.trace.SupplierWrapper;
+import org.apache.skywalking.apm.toolkit.trace.ConsumerWrapper;
+import org.apache.skywalking.apm.toolkit.trace.FunctionWrapper;
 import org.apache.skywalking.apm.toolkit.trace.Tag;
 import org.apache.skywalking.apm.toolkit.trace.Trace;
 import org.springframework.stereotype.Component;
@@ -105,6 +109,14 @@ public class TestService {
         CompletableFuture.supplyAsync(SupplierWrapper.of(supplier));
     }
 
+    public void asyncSupplierThenAccept(Supplier<Boolean> supplier, Consumer<Boolean> consumer) {
+        CompletableFuture.supplyAsync(SupplierWrapper.of(supplier)).thenAccept(ConsumerWrapper.of(consumer));
+    }
+
+    public void asyncSupplierThenApply(Supplier<Boolean> supplier, Function function) {
+        CompletableFuture.supplyAsync(SupplierWrapper.of(supplier)).thenApply(FunctionWrapper.of(function));
+    }
+
     @Trace
     public void testSetOperationName(String operationName) {
         ActiveSpan.setOperationName(operationName);