You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/03/20 03:32:01 UTC
[skywalking] branch master updated: fix finagle: process NoopSpan
(#4544)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 5251cc7 fix finagle: process NoopSpan (#4544)
5251cc7 is described below
commit 5251cc7239d184d3b72069373cd83a3cb84e7482
Author: yoje <hu...@gmail.com>
AuthorDate: Fri Mar 20 11:31:52 2020 +0800
fix finagle: process NoopSpan (#4544)
fix KafkaProducer CallbackInterceptor npe
Co-authored-by: huangyongjie <hu...@tigerbrokers>
Co-authored-by: 吴晟 Wu Sheng <wu...@foxmail.com>
---
.../apm/plugin/finagle/AnnotationInterceptor.java | 3 +-
.../ClientDestTracingFilterInterceptor.java | 3 +-
.../skywalking/apm/plugin/finagle/CodecUtils.java | 71 +++++++++++++---------
.../skywalking/apm/plugin/finagle/Constants.java | 2 +
.../apm/plugin/finagle/ContextCarrierHelper.java | 39 +++++++-----
.../finagle/ServerTracingFilterInterceptor.java | 3 +-
.../apm/plugin/finagle/CodecUtilsTest.java | 26 ++++----
.../apm/plugin/kafka/CallbackInterceptor.java | 5 +-
8 files changed, 90 insertions(+), 62 deletions(-)
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/AnnotationInterceptor.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/AnnotationInterceptor.java
index 51492e2..7cc25f5 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/AnnotationInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/AnnotationInterceptor.java
@@ -19,7 +19,6 @@
package org.apache.skywalking.apm.plugin.finagle;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
-import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
@@ -66,7 +65,7 @@ public class AnnotationInterceptor {
* which comes from client.
*/
span.setOperationName(rpc);
- tryInjectContext((ExitSpan) span);
+ tryInjectContext(span);
}
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ClientDestTracingFilterInterceptor.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ClientDestTracingFilterInterceptor.java
index 5f083de..291660f 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ClientDestTracingFilterInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ClientDestTracingFilterInterceptor.java
@@ -20,7 +20,6 @@ package org.apache.skywalking.apm.plugin.finagle;
import com.twitter.finagle.Address;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
-import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
@@ -47,7 +46,7 @@ public class ClientDestTracingFilterInterceptor extends AbstractInterceptor {
public void beforeMethodImpl(EnhancedInstance enhancedInstance, Method method, Object[] objects, Class<?>[] classes, MethodInterceptResult methodInterceptResult) throws Throwable {
String peer = (String) enhancedInstance.getSkyWalkingDynamicField();
getLocalContextHolder().let(FinagleCtxs.PEER_HOST, peer);
- tryInjectContext((ExitSpan) getSpan());
+ tryInjectContext(getSpan());
}
@Override
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/CodecUtils.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/CodecUtils.java
index 52426e5..d9c5503 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/CodecUtils.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/CodecUtils.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.util.StringUtil;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
@@ -33,6 +34,8 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.skywalking.apm.plugin.finagle.Constants.EMPTY_SWCONTEXTCARRIER;
+
public class CodecUtils {
static ILog LOGGER = LogManager.getLogger(CodecUtils.class);
@@ -68,21 +71,24 @@ public class CodecUtils {
* @return
*/
static Buf encode(SWContextCarrier swContextCarrier) {
- ByteArrayOutputStream bos = getBos();
- try (DataOutputStream dos = new DataOutputStream(bos)) {
- putString(dos, swContextCarrier.getOperationName());
- CarrierItem next = swContextCarrier.getCarrier().items();
- while (next.hasNext()) {
- next = next.next();
- if (next.getHeadKey() != null && next.getHeadValue() != null) {
- putString(dos, next.getHeadKey());
- putString(dos, next.getHeadValue());
+ if (StringUtil.isNotEmpty(swContextCarrier.getOperationName())
+ && swContextCarrier.getCarrier() != null) {
+ ByteArrayOutputStream bos = getBos();
+ try (DataOutputStream dos = new DataOutputStream(bos)) {
+ putString(dos, swContextCarrier.getOperationName());
+ CarrierItem next = swContextCarrier.getCarrier().items();
+ while (next.hasNext()) {
+ next = next.next();
+ if (next.getHeadKey() != null && next.getHeadValue() != null) {
+ putString(dos, next.getHeadKey());
+ putString(dos, next.getHeadValue());
+ }
}
+ bos.flush();
+ return Bufs.ownedBuf(bos.toByteArray());
+ } catch (Exception e) {
+ LOGGER.error("encode swContextCarrier exception.", e);
}
- bos.flush();
- return Bufs.ownedBuf(bos.toByteArray());
- } catch (Exception e) {
- LOGGER.error("encode swContextCarrier exception.", e);
}
return Bufs.EMPTY;
}
@@ -102,23 +108,32 @@ public class CodecUtils {
* @return
*/
static SWContextCarrier decode(Buf buf) {
- ContextCarrier contextCarrier = new ContextCarrier();
- SWContextCarrier swContextCarrier = new SWContextCarrier();
- swContextCarrier.setContextCarrier(contextCarrier);
-
- ByteBuffer byteBuffer = ByteBuffer.wrap(Bufs.ownedByteArray(buf));
- String operationName = getNextString(byteBuffer);
- if (operationName != null) {
- swContextCarrier.setOperationName(operationName);
- }
+ try {
+ byte[] bytes = Bufs.ownedByteArray(buf);
+ if (bytes == null || bytes.length == 0) {
+ return EMPTY_SWCONTEXTCARRIER;
+ }
+ ContextCarrier contextCarrier = new ContextCarrier();
+ SWContextCarrier swContextCarrier = new SWContextCarrier();
+ swContextCarrier.setContextCarrier(contextCarrier);
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ String operationName = getNextString(byteBuffer);
+ if (operationName != null) {
+ swContextCarrier.setOperationName(operationName);
+ }
- Map<String, String> data = readToMap(byteBuffer);
- CarrierItem next = contextCarrier.items();
- while (next.hasNext()) {
- next = next.next();
- next.setHeadValue(data.get(next.getHeadKey()));
+ Map<String, String> data = readToMap(byteBuffer);
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ next.setHeadValue(data.get(next.getHeadKey()));
+ }
+ return swContextCarrier;
+ } catch (Exception e) {
+ LOGGER.error("decode swContextCarrier exception.", e);
}
- return swContextCarrier;
+ return EMPTY_SWCONTEXTCARRIER;
}
private static void putString(DataOutputStream dos, String value) throws IOException {
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/Constants.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/Constants.java
index 791a624..e32045d 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/Constants.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/Constants.java
@@ -21,4 +21,6 @@ package org.apache.skywalking.apm.plugin.finagle;
public class Constants {
public static final String PENDING_OP_NAME = "pending";
+
+ public static final SWContextCarrier EMPTY_SWCONTEXTCARRIER = new SWContextCarrier();
}
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ContextCarrierHelper.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ContextCarrierHelper.java
index 1f09451..a1ac9d1 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ContextCarrierHelper.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ContextCarrierHelper.java
@@ -19,7 +19,9 @@
package org.apache.skywalking.apm.plugin.finagle;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.ExitTypeSpan;
import static org.apache.skywalking.apm.plugin.finagle.Constants.PENDING_OP_NAME;
import static org.apache.skywalking.apm.plugin.finagle.FinagleCtxs.getPeerHost;
@@ -34,23 +36,28 @@ class ContextCarrierHelper {
* interceptor, we check if the op name and peer information are exists in LocalContext, if it exists, we set it
* to span and inject to contextCarrier.
*/
- static void tryInjectContext(ExitSpan span) {
- String operationName = span.getOperationName();
- if (PENDING_OP_NAME.equals(operationName)) {
- return;
- }
- String peer = getPeerHost();
- if (peer == null) {
- return;
- }
- span.setPeer(peer);
+ static void tryInjectContext(AbstractSpan span) {
+ /*
+ * this may be a {@link NoopSpan}.
+ */
+ if (span != null && span.isExit()) {
+ String operationName = span.getOperationName();
+ if (PENDING_OP_NAME.equals(operationName)) {
+ return;
+ }
+ String peer = getPeerHost();
+ if (peer == null) {
+ return;
+ }
+ span.setPeer(peer);
- ContextCarrier contextCarrier = new ContextCarrier();
- span.inject(contextCarrier);
+ ContextCarrier contextCarrier = new ContextCarrier();
+ ((ExitTypeSpan) span).inject(contextCarrier);
- SWContextCarrier swContextCarrier = getSWContextCarrier();
- // we can ensure swContextCarrier is not null here
- swContextCarrier.setContextCarrier(contextCarrier);
- swContextCarrier.setOperationName(operationName);
+ SWContextCarrier swContextCarrier = getSWContextCarrier();
+ // we can ensure swContextCarrier is not null here
+ swContextCarrier.setContextCarrier(contextCarrier);
+ swContextCarrier.setOperationName(operationName);
+ }
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ServerTracingFilterInterceptor.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ServerTracingFilterInterceptor.java
index 6cd22be..b473cac 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ServerTracingFilterInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ServerTracingFilterInterceptor.java
@@ -21,7 +21,6 @@ package org.apache.skywalking.apm.plugin.finagle;
import com.twitter.finagle.context.Contexts;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
-import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -49,7 +48,7 @@ public class ServerTracingFilterInterceptor extends AbstractInterceptor {
SWContextCarrier swContextCarrier = Contexts.broadcast().apply(SWContextCarrier$.MODULE$);
span = ContextManager.createEntrySpan(swContextCarrier.getOperationName(), swContextCarrier.getCarrier());
} else {
- span = ContextManager.createEntrySpan("unknown", new ContextCarrier());
+ span = ContextManager.createEntrySpan("unknown", null);
}
span.setComponent(FINAGLE);
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java
index d7763d6..4d4f1ab 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.UUID;
import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
public class CodecUtilsTest {
@@ -43,9 +44,7 @@ public class CodecUtilsTest {
swContextCarrier = makeSWContextCarrier();
assertSwContextCarrier(swContextCarrier, CodecUtils.decode(CodecUtils.encode(swContextCarrier)));
- ContextCarrier contextCarrier = new ContextCarrier();
swContextCarrier = new SWContextCarrier();
- swContextCarrier.setContextCarrier(contextCarrier);
assertSwContextCarrier(swContextCarrier, CodecUtils.decode(Bufs.EMPTY));
}
@@ -65,15 +64,20 @@ public class CodecUtilsTest {
private void assertSwContextCarrier(SWContextCarrier expected, SWContextCarrier actual) {
assertThat(expected.getOperationName(), is(actual.getOperationName()));
Map<String, String> data = new HashMap<>();
- CarrierItem next = expected.getCarrier().items();
- while (next.hasNext()) {
- next = next.next();
- data.put(next.getHeadKey(), next.getHeadValue());
- }
- next = actual.getCarrier().items();
- while (next.hasNext()) {
- next = next.next();
- assertThat(next.getHeadValue(), is(data.get(next.getHeadKey())));
+ if (actual.getCarrier() == null) {
+ assertNull(expected.getCarrier());
+ } else {
+ CarrierItem next = expected.getCarrier().items();
+ while (next.hasNext()) {
+ next = next.next();
+ data.put(next.getHeadKey(), next.getHeadValue());
+ }
+ next = actual.getCarrier().items();
+ while (next.hasNext()) {
+ next = next.next();
+ assertThat(next.getHeadValue(), is(data.get(next.getHeadKey())));
+ }
}
+
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
index d834c49..96e763e 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java
@@ -44,7 +44,10 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
RecordMetadata metadata = (RecordMetadata) allArguments[0];
AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback");
activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
- Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
+ if (metadata != null) {
+ // Null if an error occurred during processing of this record
+ Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
+ }
ContextManager.continued(snapshot);
}
}