You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/02/15 08:38:31 UTC
[skywalking-java] branch main updated: Fix gRPC plugin not working for server side in some case (#457)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git
The following commit(s) were added to refs/heads/main by this push:
new 6b35f06d15 Fix gRPC plugin not working for server side in some case (#457)
6b35f06d15 is described below
commit 6b35f06d1510e2be6184f3cc87331d9226365718
Author: Kanro <hi...@live.cn>
AuthorDate: Wed Feb 15 16:38:24 2023 +0800
Fix gRPC plugin not working for server side in some case (#457)
---
CHANGES.md | 1 +
.../AbstractServerImplBuilderInstrumentation.java | 7 ++---
.../AbstractServerImplBuilderInterceptor.java | 10 ++++--
.../plugin/grpc/v1/server/ServerInterceptor.java | 36 ++++++++++++++++++++--
.../plugin/grpc/v1/server/TracingServerCall.java | 4 ++-
.../grpc/v1/server/TracingServerCallListener.java | 30 +++---------------
6 files changed, 53 insertions(+), 35 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index e0a9a6bc76..23f4db54dc 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,7 @@ Release Notes.
* Add plugin to support ClickHouse JDBC driver (0.3.2.*).
* Refactor kotlin coroutine plugin with CoroutineContext.
* Fix OracleURLParser ignoring actual port when :SID is absent.
+* Change gRPC instrumentation point to fix plugin not working for server side.
#### Documentation
* Update docs of Tracing APIs, reorganize the API docs into six parts.
diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java
index e0823d9301..f32987f4bc 100644
--- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java
@@ -26,15 +26,14 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInst
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
-import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
public class AbstractServerImplBuilderInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String ENHANCE_CLASS = "io.grpc.internal.AbstractServerImplBuilder";
- public static final String ENHANCE_METHOD = "addService";
+ public static final String ENHANCE_METHOD = "build";
public static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.grpc.v1.server.AbstractServerImplBuilderInterceptor";
- public static final String ARGUMENT_TYPE = "io.grpc.ServerServiceDefinition";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
@@ -47,7 +46,7 @@ public class AbstractServerImplBuilderInstrumentation extends ClassInstanceMetho
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
- return named(ENHANCE_METHOD).and(takesArgumentWithType(0, ARGUMENT_TYPE));
+ return named(ENHANCE_METHOD).and(takesNoArguments());
}
@Override
diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java
index 605c785bd6..c02cf566de 100644
--- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java
@@ -18,8 +18,7 @@
package org.apache.skywalking.apm.plugin.grpc.v1.server;
-import io.grpc.ServerInterceptors;
-import io.grpc.ServerServiceDefinition;
+import io.grpc.ServerBuilder;
import java.lang.reflect.Method;
@@ -34,7 +33,12 @@ public class AbstractServerImplBuilderInterceptor implements InstanceMethodsArou
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) {
- allArguments[0] = ServerInterceptors.intercept((ServerServiceDefinition) allArguments[0], new ServerInterceptor());
+ if (objInst.getSkyWalkingDynamicField() == null) {
+ ServerBuilder<?> builder = (ServerBuilder) objInst;
+ ServerInterceptor interceptor = new ServerInterceptor();
+ builder.intercept(interceptor);
+ objInst.setSkyWalkingDynamicField(interceptor);
+ }
}
@Override
diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java
index e5c94fbf6b..f970d813f0 100644
--- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java
@@ -18,14 +18,27 @@
package org.apache.skywalking.apm.plugin.grpc.v1.server;
+import io.grpc.Context;
+import io.grpc.Contexts;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.util.StringUtil;
+import static org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil.formatOperationName;
+
public class ServerInterceptor implements io.grpc.ServerInterceptor {
+
+ static final Context.Key<ContextSnapshot> CONTEXT_SNAPSHOT_KEY = Context.key("skywalking-grpc-context-snapshot");
+ static final Context.Key<AbstractSpan> ACTIVE_SPAN_KEY = Context.key("skywalking-grpc-active-span");
+
@Override
public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall<REQUEST, RESPONSE> call,
Metadata headers, ServerCallHandler<REQUEST, RESPONSE> handler) {
@@ -38,7 +51,26 @@ public class ServerInterceptor implements io.grpc.ServerInterceptor {
next.setHeadValue(contextValue);
}
}
- return new TracingServerCallListener<>(handler.startCall(new TracingServerCall<>(call), headers), call
- .getMethodDescriptor(), contextCarrier);
+
+ final AbstractSpan span = ContextManager
+ .createEntrySpan(formatOperationName(call.getMethodDescriptor()), contextCarrier);
+ span.setComponent(ComponentsDefine.GRPC);
+ span.setLayer(SpanLayer.RPC_FRAMEWORK);
+ ContextSnapshot contextSnapshot = ContextManager.capture();
+ AbstractSpan asyncSpan = span.prepareForAsync();
+
+ Context context = Context.current().withValues(CONTEXT_SNAPSHOT_KEY, contextSnapshot, ACTIVE_SPAN_KEY, asyncSpan);
+
+ ServerCall.Listener<REQUEST> listener = Contexts.interceptCall(
+ context,
+ new TracingServerCall<>(call),
+ headers,
+ (serverCall, metadata) -> new TracingServerCallListener<>(
+ handler.startCall(serverCall, metadata),
+ serverCall.getMethodDescriptor()
+ )
+ );
+ ContextManager.stopSpan(asyncSpan);
+ return listener;
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java
index 66935421b6..3361c4b7db 100644
--- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java
+++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java
@@ -49,7 +49,7 @@ public class TracingServerCall<REQUEST, RESPONSE> extends ForwardingServerCall.S
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_MESSAGE_OPERATION_NAME);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
-
+ ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
try {
super.sendMessage(message);
} catch (Throwable t) {
@@ -68,6 +68,7 @@ public class TracingServerCall<REQUEST, RESPONSE> extends ForwardingServerCall.S
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_CLOSE_OPERATION_NAME);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
+ ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
switch (status.getCode()) {
case OK:
break;
@@ -93,6 +94,7 @@ public class TracingServerCall<REQUEST, RESPONSE> extends ForwardingServerCall.S
break;
}
Tags.RPC_RESPONSE_STATUS_CODE.set(span, status.getCode().name());
+ ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish();
try {
super.close(status, trailers);
diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java
index 3b0c830a94..cb19e86c6c 100644
--- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java
+++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java
@@ -21,9 +21,7 @@ package org.apache.skywalking.apm.plugin.grpc.v1.server;
import io.grpc.ForwardingServerCallListener;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
-import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
-import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
@@ -36,18 +34,11 @@ import org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil;
public class TracingServerCallListener<REQUEST> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<REQUEST> {
private final MethodDescriptor.MethodType methodType;
private final String operationPrefix;
- private final String operation;
- private final ContextCarrier contextCarrier;
- private AbstractSpan asyncSpan;
- private ContextSnapshot contextSnapshot;
-
- protected TracingServerCallListener(ServerCall.Listener<REQUEST> delegate, MethodDescriptor<REQUEST, ?> descriptor, ContextCarrier contextCarrier) {
+ protected TracingServerCallListener(ServerCall.Listener<REQUEST> delegate, MethodDescriptor<REQUEST, ?> descriptor) {
super(delegate);
this.methodType = descriptor.getType();
this.operationPrefix = OperationNameFormatUtil.formatOperationName(descriptor) + SERVER;
- this.operation = OperationNameFormatUtil.formatOperationName(descriptor);
- this.contextCarrier = contextCarrier;
}
@Override
@@ -57,7 +48,7 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_MESSAGE_OPERATION_NAME);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
- ContextManager.continued(contextSnapshot);
+ ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
try {
super.onMessage(message);
} catch (Throwable t) {
@@ -73,13 +64,10 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList
@Override
public void onCancel() {
- if (contextSnapshot == null) {
- return;
- }
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_CANCEL_OPERATION_NAME);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
- ContextManager.continued(contextSnapshot);
+ ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
try {
super.onCancel();
} catch (Throwable t) {
@@ -87,7 +75,7 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList
throw t;
} finally {
ContextManager.stopSpan();
- asyncSpan.asyncFinish();
+ ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish();
}
}
@@ -96,7 +84,7 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_HALF_CLOSE_OPERATION_NAME);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
- ContextManager.continued(contextSnapshot);
+ ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
try {
super.onHalfClose();
} catch (Throwable t) {
@@ -110,18 +98,10 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList
@Override
public void onComplete() {
super.onComplete();
- asyncSpan.asyncFinish();
}
@Override
public void onReady() {
- final AbstractSpan span = ContextManager.createEntrySpan(operation, contextCarrier);
- span.setComponent(ComponentsDefine.GRPC);
- span.setLayer(SpanLayer.RPC_FRAMEWORK);
- contextSnapshot = ContextManager.capture();
- asyncSpan = span.prepareForAsync();
- ContextManager.stopSpan(asyncSpan);
-
super.onReady();
}
}
\ No newline at end of file