You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/03/20 03:32:01 UTC

[skywalking] branch master updated: fix finagle: process NoopSpan (#4544)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 5251cc7  fix finagle: process NoopSpan (#4544)
5251cc7 is described below

commit 5251cc7239d184d3b72069373cd83a3cb84e7482
Author: yoje <hu...@gmail.com>
AuthorDate: Fri Mar 20 11:31:52 2020 +0800

    fix finagle: process NoopSpan (#4544)
    
    fix KafkaProducer CallbackInterceptor npe
    
    Co-authored-by: huangyongjie <hu...@tigerbrokers>
    Co-authored-by: 吴晟 Wu Sheng <wu...@foxmail.com>
---
 .../apm/plugin/finagle/AnnotationInterceptor.java  |  3 +-
 .../ClientDestTracingFilterInterceptor.java        |  3 +-
 .../skywalking/apm/plugin/finagle/CodecUtils.java  | 71 +++++++++++++---------
 .../skywalking/apm/plugin/finagle/Constants.java   |  2 +
 .../apm/plugin/finagle/ContextCarrierHelper.java   | 39 +++++++-----
 .../finagle/ServerTracingFilterInterceptor.java    |  3 +-
 .../apm/plugin/finagle/CodecUtilsTest.java         | 26 ++++----
 .../apm/plugin/kafka/CallbackInterceptor.java      |  5 +-
 8 files changed, 90 insertions(+), 62 deletions(-)

diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/AnnotationInterceptor.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/AnnotationInterceptor.java
index 51492e2..7cc25f5 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/AnnotationInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/AnnotationInterceptor.java
@@ -19,7 +19,6 @@
 package org.apache.skywalking.apm.plugin.finagle;
 
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
-import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
 
@@ -66,7 +65,7 @@ public class AnnotationInterceptor {
                      * which comes from client.
                      */
                     span.setOperationName(rpc);
-                    tryInjectContext((ExitSpan) span);
+                    tryInjectContext(span);
                 }
             }
         }
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ClientDestTracingFilterInterceptor.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ClientDestTracingFilterInterceptor.java
index 5f083de..291660f 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ClientDestTracingFilterInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ClientDestTracingFilterInterceptor.java
@@ -20,7 +20,6 @@ package org.apache.skywalking.apm.plugin.finagle;
 
 import com.twitter.finagle.Address;
 import org.apache.skywalking.apm.agent.core.context.ContextManager;
-import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
 
@@ -47,7 +46,7 @@ public class ClientDestTracingFilterInterceptor extends AbstractInterceptor {
     public void beforeMethodImpl(EnhancedInstance enhancedInstance, Method method, Object[] objects, Class<?>[] classes, MethodInterceptResult methodInterceptResult) throws Throwable {
         String peer = (String) enhancedInstance.getSkyWalkingDynamicField();
         getLocalContextHolder().let(FinagleCtxs.PEER_HOST, peer);
-        tryInjectContext((ExitSpan) getSpan());
+        tryInjectContext(getSpan());
     }
 
     @Override
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/CodecUtils.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/CodecUtils.java
index 52426e5..d9c5503 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/CodecUtils.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/CodecUtils.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.agent.core.context.CarrierItem;
 import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
 import org.apache.skywalking.apm.agent.core.logging.api.ILog;
 import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.util.StringUtil;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -33,6 +34,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.skywalking.apm.plugin.finagle.Constants.EMPTY_SWCONTEXTCARRIER;
+
 public class CodecUtils {
 
     static ILog LOGGER = LogManager.getLogger(CodecUtils.class);
@@ -68,21 +71,24 @@ public class CodecUtils {
      * @return
      */
     static Buf encode(SWContextCarrier swContextCarrier) {
-        ByteArrayOutputStream bos = getBos();
-        try (DataOutputStream dos = new DataOutputStream(bos)) {
-            putString(dos, swContextCarrier.getOperationName());
-            CarrierItem next = swContextCarrier.getCarrier().items();
-            while (next.hasNext()) {
-                next = next.next();
-                if (next.getHeadKey() != null && next.getHeadValue() != null) {
-                    putString(dos, next.getHeadKey());
-                    putString(dos, next.getHeadValue());
+        if (StringUtil.isNotEmpty(swContextCarrier.getOperationName())
+                && swContextCarrier.getCarrier() != null) {
+            ByteArrayOutputStream bos = getBos();
+            try (DataOutputStream dos = new DataOutputStream(bos)) {
+                putString(dos, swContextCarrier.getOperationName());
+                CarrierItem next = swContextCarrier.getCarrier().items();
+                while (next.hasNext()) {
+                    next = next.next();
+                    if (next.getHeadKey() != null && next.getHeadValue() != null) {
+                        putString(dos, next.getHeadKey());
+                        putString(dos, next.getHeadValue());
+                    }
                 }
+                bos.flush();
+                return Bufs.ownedBuf(bos.toByteArray());
+            } catch (Exception e) {
+                LOGGER.error("encode swContextCarrier exception.", e);
             }
-            bos.flush();
-            return Bufs.ownedBuf(bos.toByteArray());
-        } catch (Exception e) {
-            LOGGER.error("encode swContextCarrier exception.", e);
         }
         return Bufs.EMPTY;
     }
@@ -102,23 +108,32 @@ public class CodecUtils {
      * @return
      */
     static SWContextCarrier decode(Buf buf) {
-        ContextCarrier contextCarrier = new ContextCarrier();
-        SWContextCarrier swContextCarrier = new SWContextCarrier();
-        swContextCarrier.setContextCarrier(contextCarrier);
-
-        ByteBuffer byteBuffer = ByteBuffer.wrap(Bufs.ownedByteArray(buf));
-        String operationName = getNextString(byteBuffer);
-        if (operationName != null) {
-            swContextCarrier.setOperationName(operationName);
-        }
+        try {
+            byte[] bytes = Bufs.ownedByteArray(buf);
+            if (bytes == null || bytes.length == 0) {
+                return EMPTY_SWCONTEXTCARRIER;
+            }
+            ContextCarrier contextCarrier = new ContextCarrier();
+            SWContextCarrier swContextCarrier = new SWContextCarrier();
+            swContextCarrier.setContextCarrier(contextCarrier);
+
+            ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+            String operationName = getNextString(byteBuffer);
+            if (operationName != null) {
+                swContextCarrier.setOperationName(operationName);
+            }
 
-        Map<String, String> data = readToMap(byteBuffer);
-        CarrierItem next = contextCarrier.items();
-        while (next.hasNext()) {
-            next = next.next();
-            next.setHeadValue(data.get(next.getHeadKey()));
+            Map<String, String> data = readToMap(byteBuffer);
+            CarrierItem next = contextCarrier.items();
+            while (next.hasNext()) {
+                next = next.next();
+                next.setHeadValue(data.get(next.getHeadKey()));
+            }
+            return swContextCarrier;
+        } catch (Exception e) {
+            LOGGER.error("decode swContextCarrier exception.", e);
         }
-        return swContextCarrier;
+        return EMPTY_SWCONTEXTCARRIER;
     }
 
     private static void putString(DataOutputStream dos, String value) throws IOException {
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/Constants.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/Constants.java
index 791a624..e32045d 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/Constants.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/Constants.java
@@ -21,4 +21,6 @@ package org.apache.skywalking.apm.plugin.finagle;
 public class Constants {
 
     public static final String PENDING_OP_NAME = "pending";
+
+    public static final SWContextCarrier EMPTY_SWCONTEXTCARRIER = new SWContextCarrier();
 }
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ContextCarrierHelper.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ContextCarrierHelper.java
index 1f09451..a1ac9d1 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ContextCarrierHelper.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ContextCarrierHelper.java
@@ -19,7 +19,9 @@
 package org.apache.skywalking.apm.plugin.finagle;
 
 import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
 import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.ExitTypeSpan;
 
 import static org.apache.skywalking.apm.plugin.finagle.Constants.PENDING_OP_NAME;
 import static org.apache.skywalking.apm.plugin.finagle.FinagleCtxs.getPeerHost;
@@ -34,23 +36,28 @@ class ContextCarrierHelper {
      * interceptor, we check if the op name and peer information are exists in LocalContext, if it exists, we set it
      * to span and inject to contextCarrier.
      */
-    static void tryInjectContext(ExitSpan span) {
-        String operationName = span.getOperationName();
-        if (PENDING_OP_NAME.equals(operationName)) {
-            return;
-        }
-        String peer = getPeerHost();
-        if (peer == null) {
-            return;
-        }
-        span.setPeer(peer);
+    static void tryInjectContext(AbstractSpan span) {
+        /*
+         * this may be a {@link NoopSpan}.
+         */
+        if (span != null && span.isExit()) {
+            String operationName = span.getOperationName();
+            if (PENDING_OP_NAME.equals(operationName)) {
+                return;
+            }
+            String peer = getPeerHost();
+            if (peer == null) {
+                return;
+            }
+            span.setPeer(peer);
 
-        ContextCarrier contextCarrier = new ContextCarrier();
-        span.inject(contextCarrier);
+            ContextCarrier contextCarrier = new ContextCarrier();
+            ((ExitTypeSpan) span).inject(contextCarrier);
 
-        SWContextCarrier swContextCarrier = getSWContextCarrier();
-        // we can ensure swContextCarrier is not null here
-        swContextCarrier.setContextCarrier(contextCarrier);
-        swContextCarrier.setOperationName(operationName);
+            SWContextCarrier swContextCarrier = getSWContextCarrier();
+            // we can ensure swContextCarrier is not null here
+            swContextCarrier.setContextCarrier(contextCarrier);
+            swContextCarrier.setOperationName(operationName);
+        }
     }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ServerTracingFilterInterceptor.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ServerTracingFilterInterceptor.java
index 6cd22be..b473cac 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ServerTracingFilterInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ServerTracingFilterInterceptor.java
@@ -21,7 +21,6 @@ package org.apache.skywalking.apm.plugin.finagle;
 import com.twitter.finagle.context.Contexts;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
-import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
 import org.apache.skywalking.apm.agent.core.context.ContextManager;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
 import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -49,7 +48,7 @@ public class ServerTracingFilterInterceptor extends AbstractInterceptor {
             SWContextCarrier swContextCarrier = Contexts.broadcast().apply(SWContextCarrier$.MODULE$);
             span = ContextManager.createEntrySpan(swContextCarrier.getOperationName(), swContextCarrier.getCarrier());
         } else {
-            span = ContextManager.createEntrySpan("unknown", new ContextCarrier());
+            span = ContextManager.createEntrySpan("unknown", null);
         }
 
         span.setComponent(FINAGLE);
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java
index d7763d6..4d4f1ab 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 public class CodecUtilsTest {
@@ -43,9 +44,7 @@ public class CodecUtilsTest {
         swContextCarrier = makeSWContextCarrier();
         assertSwContextCarrier(swContextCarrier, CodecUtils.decode(CodecUtils.encode(swContextCarrier)));
 
-        ContextCarrier contextCarrier = new ContextCarrier();
         swContextCarrier = new SWContextCarrier();
-        swContextCarrier.setContextCarrier(contextCarrier);
         assertSwContextCarrier(swContextCarrier, CodecUtils.decode(Bufs.EMPTY));
     }
 
@@ -65,15 +64,20 @@ public class CodecUtilsTest {
     private void assertSwContextCarrier(SWContextCarrier expected, SWContextCarrier actual) {
         assertThat(expected.getOperationName(), is(actual.getOperationName()));
         Map<String, String> data = new HashMap<>();
-        CarrierItem next = expected.getCarrier().items();
-        while (next.hasNext()) {
-            next = next.next();
-            data.put(next.getHeadKey(), next.getHeadValue());
-        }
-        next = actual.getCarrier().items();
-        while (next.hasNext()) {
-            next = next.next();
-            assertThat(next.getHeadValue(), is(data.get(next.getHeadKey())));
+        if (actual.getCarrier() == null) {
+            assertNull(expected.getCarrier());
+        } else {
+            CarrierItem next = expected.getCarrier().items();
+            while (next.hasNext()) {
+                next = next.next();
+                data.put(next.getHeadKey(), next.getHeadValue());
+            }
+            next = actual.getCarrier().items();
+            while (next.hasNext()) {
+                next = next.next();
+                assertThat(next.getHeadValue(), is(data.get(next.getHeadKey())));
+            }
         }
+
     }
 }
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
index d834c49..96e763e 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
@@ -44,7 +44,10 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
             RecordMetadata metadata = (RecordMetadata) allArguments[0];
             AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback");
             activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
-            Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
+            if (metadata != null) {
+                // Null if an error occurred during processing of this record
+                Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
+            }
             ContextManager.continued(snapshot);
         }
     }