You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/11/01 13:37:04 UTC
[skywalking-java] branch main updated: [Enhance]For trace
CompletableFuture thenApply or thenAccept (#60)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git
The following commit(s) were added to refs/heads/main by this push:
new 1308d3c [Enhance]For trace CompletableFuture thenApply or thenAccept (#60)
1308d3c is described below
commit 1308d3c0ae360cf4632f87363365f22c4c6fc89f
Author: will2020-power <63...@users.noreply.github.com>
AuthorDate: Mon Nov 1 21:36:34 2021 +0800
[Enhance]For trace CompletableFuture thenApply or thenAccept (#60)
---
CHANGES.md | 1 +
.../apm/toolkit/trace/ConsumerWrapper.java | 40 ++++++++
.../apm/toolkit/trace/FunctionWrapper.java | 40 ++++++++
.../trace/CallableOrRunnableActivation.java | 6 +-
.../Application-toolkit-trace-cross-thread.md | 18 ++++
.../config/expectedData.yaml | 107 ++++++++++++++++++++-
.../apm/toolkit/trace/ConsumerWrapper.java | 40 ++++++++
.../apm/toolkit/trace/FunctionWrapper.java | 40 ++++++++
.../toolkit/controller/TestController.java | 43 ++++++++-
.../testcase/toolkit/controller/TestService.java | 12 +++
10 files changed, 344 insertions(+), 3 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index beded13..bc3cedb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,6 +6,7 @@ Release Notes.
------------------
* Support `Transaction` and fix duplicated methods enhancements for `jedis-2.x` plugin.
+* Add ConsumerWrapper/FunctionWrapper to support CompletableFuture.x.thenAcceptAsync/thenApplyAsync.
#### Documentation
diff --git a/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/ConsumerWrapper.java b/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/ConsumerWrapper.java
new file mode 100644
index 0000000..9d05ef1
--- /dev/null
+++ b/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/ConsumerWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.trace;
+
+import java.util.function.Consumer;
+
+@TraceCrossThread
+public class ConsumerWrapper<V> implements Consumer<V> {
+ final Consumer<V> consumer;
+
+ public ConsumerWrapper(Consumer<V> consumer) {
+ this.consumer = consumer;
+ }
+
+ public static <V> ConsumerWrapper<V> of(Consumer<V> consumer) {
+ return new ConsumerWrapper(consumer);
+ }
+
+ @Override
+ public void accept(V v) {
+ this.consumer.accept(v);
+ }
+
+}
diff --git a/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/FunctionWrapper.java b/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/FunctionWrapper.java
new file mode 100644
index 0000000..33878c6
--- /dev/null
+++ b/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/FunctionWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.trace;
+
+import java.util.function.Function;
+
+@TraceCrossThread
+public class FunctionWrapper<T, R> implements Function<T, R> {
+ final Function<T, R> function;
+
+ public FunctionWrapper(Function<T, R> function) {
+ this.function = function;
+ }
+
+ public static <T, R> FunctionWrapper<T, R> of(Function<T, R> function) {
+ return new FunctionWrapper(function);
+ }
+
+ @Override
+ public R apply(T t) {
+ return this.function.apply(t);
+ }
+
+}
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CallableOrRunnableActivation.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CallableOrRunnableActivation.java
index 1584376..b3a6414 100644
--- a/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CallableOrRunnableActivation.java
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CallableOrRunnableActivation.java
@@ -41,6 +41,8 @@ public class CallableOrRunnableActivation extends ClassInstanceMethodsEnhancePlu
private static final String CALL_METHOD_NAME = "call";
private static final String RUN_METHOD_NAME = "run";
private static final String GET_METHOD_NAME = "get";
+ private static final String APPLY_METHOD_NAME = "apply";
+ private static final String ACCEPT_METHOD_NAME = "accept";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
@@ -68,7 +70,9 @@ public class CallableOrRunnableActivation extends ClassInstanceMethodsEnhancePlu
return named(CALL_METHOD_NAME)
.and(takesArguments(0))
.or(named(RUN_METHOD_NAME).and(takesArguments(0)))
- .or(named(GET_METHOD_NAME).and(takesArguments(0)));
+ .or(named(GET_METHOD_NAME).and(takesArguments(0)))
+ .or(named(APPLY_METHOD_NAME).and(takesArguments(1)))
+ .or(named(ACCEPT_METHOD_NAME).and(takesArguments(1)));
}
@Override
diff --git a/docs/en/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.md b/docs/en/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.md
index cfd171b..a65424a 100644
--- a/docs/en/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.md
+++ b/docs/en/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.md
@@ -57,6 +57,24 @@ or
return "SupplierWrapper";
})).thenAccept(System.out::println);
```
+* usage 4.
+```java
+ CompletableFuture.supplyAsync(SupplierWrapper.of(() -> {
+ return "SupplierWrapper";
+ })).thenAcceptAsync(ConsumerWrapper.of(c -> {
+ // your code visit(url)
+ System.out.println("ConsumerWrapper");
+ }));
+```
+or
+```java
+ CompletableFuture.supplyAsync(SupplierWrapper.of(() -> {
+ return "SupplierWrapper";
+ })).thenApplyAsync(FunctionWrapper.of(f -> {
+ // your code visit(url)
+ return "FunctionWrapper";
+ }));
+```
_Sample codes only_
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/config/expectedData.yaml b/test/plugin/scenarios/apm-toolkit-trace-scenario/config/expectedData.yaml
index 2ae6fb0..71ae848 100644
--- a/test/plugin/scenarios/apm-toolkit-trace-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/config/expectedData.yaml
@@ -334,7 +334,112 @@ segmentItems:
parentTraceSegmentId: not null, parentServiceInstance: not null, parentService: apm-toolkit-trace-scenario,
traceId: not null}
skipAnalysis: 'true'
-
+ - segmentId: not null
+ spans:
+ - operationName: /apm-toolkit-trace-scenario/case/asyncVisit/consumer
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: Http
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 2
+ isError: false
+ spanType: Exit
+ peer: localhost:8080
+ tags:
+ - {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/consumer'}
+ - {key: http.method, value: GET}
+ skipAnalysis: 'true'
+ - operationName: Thread/org.apache.skywalking.apm.toolkit.trace.ConsumerWrapper/accept
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Unknown
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 0
+ isError: false
+ spanType: Local
+ peer: ''
+ refs:
+ - {parentEndpoint: GET:/case/tool-kit, networkAddress: '', refType: CrossThread,
+ parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
+ null, parentService: apm-toolkit-trace-scenario, traceId: not null}
+ skipAnalysis: 'true'
+ - segmentId: not null
+ spans:
+ - operationName: GET:/case/asyncVisit/function
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 14
+ isError: false
+ spanType: Entry
+ peer: ''
+ tags:
+ - {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/function'}
+ - {key: http.method, value: GET}
+ - {key: correlation, value: correlationValueTest}
+ refs:
+ - {parentEndpoint: Thread/org.apache.skywalking.apm.toolkit.trace.FunctionWrapper/apply,
+ networkAddress: 'localhost:8080', refType: CrossProcess, parentSpanId: 1,
+ parentTraceSegmentId: not null, parentServiceInstance: not null, parentService: apm-toolkit-trace-scenario,
+ traceId: not null}
+ skipAnalysis: 'true'
+ - segmentId: not null
+ spans:
+ - operationName: /apm-toolkit-trace-scenario/case/asyncVisit/function
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: Http
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 2
+ isError: false
+ spanType: Exit
+ peer: localhost:8080
+ tags:
+ - {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/function'}
+ - {key: http.method, value: GET}
+ skipAnalysis: 'true'
+ - operationName: Thread/org.apache.skywalking.apm.toolkit.trace.FunctionWrapper/apply
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Unknown
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 0
+ isError: false
+ spanType: Local
+ peer: ''
+ refs:
+ - {parentEndpoint: GET:/case/tool-kit, networkAddress: '', refType: CrossThread,
+ parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
+ null, parentService: apm-toolkit-trace-scenario, traceId: not null}
+ skipAnalysis: 'true'
+ - segmentId: not null
+ spans:
+ - operationName: GET:/case/asyncVisit/consumer
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 14
+ isError: false
+ spanType: Entry
+ peer: ''
+ tags:
+ - {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/consumer'}
+ - {key: http.method, value: GET}
+ - {key: correlation, value: correlationValueTest}
+ refs:
+ - {parentEndpoint: Thread/org.apache.skywalking.apm.toolkit.trace.ConsumerWrapper/accept,
+ networkAddress: 'localhost:8080', refType: CrossProcess, parentSpanId: 1,
+ parentTraceSegmentId: not null, parentServiceInstance: not null, parentService: apm-toolkit-trace-scenario,
+ traceId: not null}
+ skipAnalysis: 'true'
meterItems:
- serviceName: apm-toolkit-trace-scenario
meterSize: 3
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/ConsumerWrapper.java b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/ConsumerWrapper.java
new file mode 100644
index 0000000..02ca819
--- /dev/null
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/ConsumerWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.trace;
+
+import java.util.function.Consumer;
+
+@TraceCrossThread
+public class ConsumerWrapper<V> implements Consumer<V> {
+ final Consumer<V> consumer;
+
+ public ConsumerWrapper(Consumer<V> consumer) {
+ this.consumer = consumer;
+ }
+
+ public static <V> ConsumerWrapper<V> of(Consumer<V> consumer) {
+ return new ConsumerWrapper(consumer);
+ }
+
+ @Override
+ public void accept(V v) {
+ this.consumer.accept(v);
+ }
+
+}
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/FunctionWrapper.java b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/FunctionWrapper.java
new file mode 100644
index 0000000..7cd780c
--- /dev/null
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/FunctionWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.toolkit.trace;
+
+import java.util.function.Function;
+
+@TraceCrossThread
+public class FunctionWrapper<T, R> implements Function<T, R> {
+ final Function<T, R> function;
+
+ public FunctionWrapper(Function<T, R> function) {
+ this.function = function;
+ }
+
+ public static <T, R> FunctionWrapper<T, R> of(Function<T, R> function) {
+ return new FunctionWrapper(function);
+ }
+
+ @Override
+ public R apply(T t) {
+ return this.function.apply(t);
+ }
+
+}
\ No newline at end of file
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestController.java b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestController.java
index 8f3d221..9cff2f2 100644
--- a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestController.java
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestController.java
@@ -84,12 +84,41 @@ public class TestController {
}
return true;
});
+ testService.asyncSupplierThenAccept(() -> {
+ try {
+ visit("http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/supplier");
+ } catch (IOException e) {
+ // ignore
+ }
+ return true;
+ }, c -> {
+ try {
+ visit("http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/consumer");
+ } catch (IOException e) {
+ // ignore
+ }
+ });
+ testService.asyncSupplierThenApply(() -> {
+ try {
+ visit("http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/supplier");
+ } catch (IOException e) {
+ // ignore
+ }
+ return true;
+ }, f -> {
+ try {
+ visit("http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/function");
+ } catch (IOException e) {
+ // ignore
+ }
+ return true;
+ });
// meter
MeterFactory.counter("test_counter").tag("ck1", "cv1").build().increment(2d);
MeterFactory.gauge("test_gauge", () -> 1d).tag("gk1", "gv1").build();
MeterFactory.histogram("test_histogram").tag("hk1", "hv1").steps(Arrays.asList(1d, 5d, 10d))
- .build().addValue(4d);
+ .build().addValue(4d);
return SUCCESS;
}
@@ -116,6 +145,18 @@ public class TestController {
return SUCCESS;
}
+ @RequestMapping("/asyncVisit/consumer")
+ public String asyncVisitConsumer() {
+ ActiveSpan.tag(CORRELATION_CONTEXT_TAG_KEY, TraceContext.getCorrelation(CORRELATION_CONTEXT_KEY).orElse(""));
+ return SUCCESS;
+ }
+
+ @RequestMapping("/asyncVisit/function")
+ public String asyncVisitFunction() {
+ ActiveSpan.tag(CORRELATION_CONTEXT_TAG_KEY, TraceContext.getCorrelation(CORRELATION_CONTEXT_KEY).orElse(""));
+ return SUCCESS;
+ }
+
private static void visit(String url) throws IOException {
CloseableHttpClient httpclient = HttpClients.createDefault();
try {
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestService.java b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestService.java
index ac21326..ded85f6 100644
--- a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestService.java
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/apache/skywalking/apm/testcase/toolkit/controller/TestService.java
@@ -23,11 +23,15 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
+import java.util.function.Consumer;
+import java.util.function.Function;
import org.apache.skywalking.apm.toolkit.model.User;
import org.apache.skywalking.apm.toolkit.trace.ActiveSpan;
import org.apache.skywalking.apm.toolkit.trace.CallableWrapper;
import org.apache.skywalking.apm.toolkit.trace.RunnableWrapper;
import org.apache.skywalking.apm.toolkit.trace.SupplierWrapper;
+import org.apache.skywalking.apm.toolkit.trace.ConsumerWrapper;
+import org.apache.skywalking.apm.toolkit.trace.FunctionWrapper;
import org.apache.skywalking.apm.toolkit.trace.Tag;
import org.apache.skywalking.apm.toolkit.trace.Trace;
import org.springframework.stereotype.Component;
@@ -105,6 +109,14 @@ public class TestService {
CompletableFuture.supplyAsync(SupplierWrapper.of(supplier));
}
+ public void asyncSupplierThenAccept(Supplier<Boolean> supplier, Consumer<Boolean> consumer) {
+ CompletableFuture.supplyAsync(SupplierWrapper.of(supplier)).thenAccept(ConsumerWrapper.of(consumer));
+ }
+
+ public void asyncSupplierThenApply(Supplier<Boolean> supplier, Function function) {
+ CompletableFuture.supplyAsync(SupplierWrapper.of(supplier)).thenApply(FunctionWrapper.of(function));
+ }
+
@Trace
public void testSetOperationName(String operationName) {
ActiveSpan.setOperationName(operationName);