You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/02/15 08:38:31 UTC

[skywalking-java] branch main updated: Fix gRPC plugin not working for server side in some case (#457)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b35f06d15 Fix gRPC plugin not working for server side in some case (#457)
6b35f06d15 is described below

commit 6b35f06d1510e2be6184f3cc87331d9226365718
Author: Kanro <hi...@live.cn>
AuthorDate: Wed Feb 15 16:38:24 2023 +0800

    Fix gRPC plugin not working for server side in some case (#457)
---
 CHANGES.md                                         |  1 +
 .../AbstractServerImplBuilderInstrumentation.java  |  7 ++---
 .../AbstractServerImplBuilderInterceptor.java      | 10 ++++--
 .../plugin/grpc/v1/server/ServerInterceptor.java   | 36 ++++++++++++++++++++--
 .../plugin/grpc/v1/server/TracingServerCall.java   |  4 ++-
 .../grpc/v1/server/TracingServerCallListener.java  | 30 +++---------------
 6 files changed, 53 insertions(+), 35 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index e0a9a6bc76..23f4db54dc 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,7 @@ Release Notes.
 * Add plugin to support ClickHouse JDBC driver (0.3.2.*).
 * Refactor kotlin coroutine plugin with CoroutineContext.
 * Fix OracleURLParser ignoring actual port when :SID is absent.
+* Change gRPC instrumentation point to fix plugin not working for server side.
 
 #### Documentation
 * Update docs of Tracing APIs, reorganize the API docs into six parts.
diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java
index e0823d9301..f32987f4bc 100644
--- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java
@@ -26,15 +26,14 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInst
 import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
-import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments;
 import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
 
 public class AbstractServerImplBuilderInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
 
     public static final String ENHANCE_CLASS = "io.grpc.internal.AbstractServerImplBuilder";
-    public static final String ENHANCE_METHOD = "addService";
+    public static final String ENHANCE_METHOD = "build";
     public static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.grpc.v1.server.AbstractServerImplBuilderInterceptor";
-    public static final String ARGUMENT_TYPE = "io.grpc.ServerServiceDefinition";
 
     @Override
     public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
@@ -47,7 +46,7 @@ public class AbstractServerImplBuilderInstrumentation extends ClassInstanceMetho
             new InstanceMethodsInterceptPoint() {
                 @Override
                 public ElementMatcher<MethodDescription> getMethodsMatcher() {
-                    return named(ENHANCE_METHOD).and(takesArgumentWithType(0, ARGUMENT_TYPE));
+                    return named(ENHANCE_METHOD).and(takesNoArguments());
                 }
 
                 @Override
diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java
index 605c785bd6..c02cf566de 100644
--- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java
@@ -18,8 +18,7 @@
 
 package org.apache.skywalking.apm.plugin.grpc.v1.server;
 
-import io.grpc.ServerInterceptors;
-import io.grpc.ServerServiceDefinition;
+import io.grpc.ServerBuilder;
 
 import java.lang.reflect.Method;
 
@@ -34,7 +33,12 @@ public class AbstractServerImplBuilderInterceptor implements InstanceMethodsArou
     @Override
     public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
         MethodInterceptResult result) {
-        allArguments[0] = ServerInterceptors.intercept((ServerServiceDefinition) allArguments[0], new ServerInterceptor());
+        if (objInst.getSkyWalkingDynamicField() == null) {
+            ServerBuilder<?> builder = (ServerBuilder) objInst;
+            ServerInterceptor interceptor = new ServerInterceptor();
+            builder.intercept(interceptor);
+            objInst.setSkyWalkingDynamicField(interceptor);
+        }
     }
 
     @Override
diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java
index e5c94fbf6b..f970d813f0 100644
--- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java
@@ -18,14 +18,27 @@
 
 package org.apache.skywalking.apm.plugin.grpc.v1.server;
 
+import io.grpc.Context;
+import io.grpc.Contexts;
 import io.grpc.Metadata;
 import io.grpc.ServerCall;
 import io.grpc.ServerCallHandler;
 import org.apache.skywalking.apm.agent.core.context.CarrierItem;
 import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
 import org.apache.skywalking.apm.util.StringUtil;
 
+import static org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil.formatOperationName;
+
 public class ServerInterceptor implements io.grpc.ServerInterceptor {
+
+    static final Context.Key<ContextSnapshot> CONTEXT_SNAPSHOT_KEY = Context.key("skywalking-grpc-context-snapshot");
+    static final Context.Key<AbstractSpan> ACTIVE_SPAN_KEY = Context.key("skywalking-grpc-active-span");
+
     @Override
     public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall<REQUEST, RESPONSE> call,
         Metadata headers, ServerCallHandler<REQUEST, RESPONSE> handler) {
@@ -38,7 +51,26 @@ public class ServerInterceptor implements io.grpc.ServerInterceptor {
                 next.setHeadValue(contextValue);
             }
         }
-        return new TracingServerCallListener<>(handler.startCall(new TracingServerCall<>(call), headers), call
-                .getMethodDescriptor(), contextCarrier);
+
+        final AbstractSpan span = ContextManager
+                .createEntrySpan(formatOperationName(call.getMethodDescriptor()), contextCarrier);
+        span.setComponent(ComponentsDefine.GRPC);
+        span.setLayer(SpanLayer.RPC_FRAMEWORK);
+        ContextSnapshot contextSnapshot = ContextManager.capture();
+        AbstractSpan asyncSpan = span.prepareForAsync();
+
+        Context context = Context.current().withValues(CONTEXT_SNAPSHOT_KEY, contextSnapshot, ACTIVE_SPAN_KEY, asyncSpan);
+
+        ServerCall.Listener<REQUEST> listener = Contexts.interceptCall(
+                context,
+                new TracingServerCall<>(call),
+                headers,
+                (serverCall, metadata) -> new TracingServerCallListener<>(
+                        handler.startCall(serverCall, metadata),
+                        serverCall.getMethodDescriptor()
+                )
+        );
+        ContextManager.stopSpan(asyncSpan);
+        return listener;
     }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java
index 66935421b6..3361c4b7db 100644
--- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java
+++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java
@@ -49,7 +49,7 @@ public class TracingServerCall<REQUEST, RESPONSE> extends ForwardingServerCall.S
             final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_MESSAGE_OPERATION_NAME);
             span.setComponent(ComponentsDefine.GRPC);
             span.setLayer(SpanLayer.RPC_FRAMEWORK);
-
+            ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
             try {
                 super.sendMessage(message);
             } catch (Throwable t) {
@@ -68,6 +68,7 @@ public class TracingServerCall<REQUEST, RESPONSE> extends ForwardingServerCall.S
         final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_CLOSE_OPERATION_NAME);
         span.setComponent(ComponentsDefine.GRPC);
         span.setLayer(SpanLayer.RPC_FRAMEWORK);
+        ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
         switch (status.getCode()) {
             case OK:
                 break;
@@ -93,6 +94,7 @@ public class TracingServerCall<REQUEST, RESPONSE> extends ForwardingServerCall.S
                 break;
         }
         Tags.RPC_RESPONSE_STATUS_CODE.set(span, status.getCode().name());
+        ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish();
 
         try {
             super.close(status, trailers);
diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java
index 3b0c830a94..cb19e86c6c 100644
--- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java
+++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java
@@ -21,9 +21,7 @@ package org.apache.skywalking.apm.plugin.grpc.v1.server;
 import io.grpc.ForwardingServerCallListener;
 import io.grpc.MethodDescriptor;
 import io.grpc.ServerCall;
-import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
 import org.apache.skywalking.apm.agent.core.context.ContextManager;
-import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
 import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
 import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
@@ -36,18 +34,11 @@ import org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil;
 public class TracingServerCallListener<REQUEST> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<REQUEST> {
     private final MethodDescriptor.MethodType methodType;
     private final String operationPrefix;
-    private final String operation;
-    private final ContextCarrier contextCarrier;
 
-    private AbstractSpan asyncSpan;
-    private ContextSnapshot contextSnapshot;
-
-    protected TracingServerCallListener(ServerCall.Listener<REQUEST> delegate, MethodDescriptor<REQUEST, ?> descriptor, ContextCarrier contextCarrier) {
+    protected TracingServerCallListener(ServerCall.Listener<REQUEST> delegate, MethodDescriptor<REQUEST, ?> descriptor) {
         super(delegate);
         this.methodType = descriptor.getType();
         this.operationPrefix = OperationNameFormatUtil.formatOperationName(descriptor) + SERVER;
-        this.operation = OperationNameFormatUtil.formatOperationName(descriptor);
-        this.contextCarrier = contextCarrier;
     }
 
     @Override
@@ -57,7 +48,7 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList
             final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_MESSAGE_OPERATION_NAME);
             span.setComponent(ComponentsDefine.GRPC);
             span.setLayer(SpanLayer.RPC_FRAMEWORK);
-            ContextManager.continued(contextSnapshot);
+            ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
             try {
                 super.onMessage(message);
             } catch (Throwable t) {
@@ -73,13 +64,10 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList
 
     @Override
     public void onCancel() {
-        if (contextSnapshot == null) {
-            return;
-        }
         final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_CANCEL_OPERATION_NAME);
         span.setComponent(ComponentsDefine.GRPC);
         span.setLayer(SpanLayer.RPC_FRAMEWORK);
-        ContextManager.continued(contextSnapshot);
+        ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
         try {
             super.onCancel();
         } catch (Throwable t) {
@@ -87,7 +75,7 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList
             throw t;
         } finally {
             ContextManager.stopSpan();
-            asyncSpan.asyncFinish();
+            ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish();
         }
     }
 
@@ -96,7 +84,7 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList
         final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_HALF_CLOSE_OPERATION_NAME);
         span.setComponent(ComponentsDefine.GRPC);
         span.setLayer(SpanLayer.RPC_FRAMEWORK);
-        ContextManager.continued(contextSnapshot);
+        ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
         try {
             super.onHalfClose();
         } catch (Throwable t) {
@@ -110,18 +98,10 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList
     @Override
     public void onComplete() {
         super.onComplete();
-        asyncSpan.asyncFinish();
     }
 
     @Override
     public void onReady() {
-        final AbstractSpan span = ContextManager.createEntrySpan(operation, contextCarrier);
-        span.setComponent(ComponentsDefine.GRPC);
-        span.setLayer(SpanLayer.RPC_FRAMEWORK);
-        contextSnapshot = ContextManager.capture();
-        asyncSpan = span.prepareForAsync();
-        ContextManager.stopSpan(asyncSpan);
-
         super.onReady();
     }
 }
\ No newline at end of file