You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/03/14 07:50:36 UTC

[incubator-skywalking] 01/01: Finish the prototype of async mode in Java agent core APIs.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch async-context
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git

commit aa2877b2fb45611d8cadeaff5687717c2bedeaa0
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Mar 14 15:50:22 2019 +0800

    Finish the prototype of async mode in Java agent core APIs.
---
 .../agent/core/context/AbstractTracerContext.java  | 31 ++++++----
 .../apm/agent/core/context/AsyncSpan.java          | 55 ++++++++++++++++++
 .../apm/agent/core/context/ContextManager.java     | 19 +++---
 .../agent/core/context/IgnoredTracerContext.java   |  8 +++
 .../apm/agent/core/context/TracingContext.java     | 67 ++++++++++++++++------
 .../apm/agent/core/context/trace/AbstractSpan.java |  3 +-
 .../core/context/trace/AbstractTracingSpan.java    | 31 +++++++---
 .../apm/agent/core/context/trace/NoopSpan.java     |  8 +++
 docs/en/guides/Java-Plugin-Development-Guide.md    | 36 ++++++++++++
 9 files changed, 210 insertions(+), 48 deletions(-)

diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
index a7c5453..1b3a93b 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
@@ -16,7 +16,6 @@
  *
  */
 
-
 package org.apache.skywalking.apm.agent.core.context;
 
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
@@ -28,41 +27,38 @@ import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
  */
 public interface AbstractTracerContext {
     /**
-     * Prepare for the cross-process propagation.
-     * How to initialize the carrier, depends on the implementation.
+     * Prepare for the cross-process propagation. How to initialize the carrier, depends on the implementation.
      *
      * @param carrier to carry the context for crossing process.
      */
     void inject(ContextCarrier carrier);
 
     /**
-     * Build the reference between this segment and a cross-process segment.
-     * How to build, depends on the implementation.
+     * Build the reference between this segment and a cross-process segment. How to build, depends on the
+     * implementation.
      *
      * @param carrier carried the context from a cross-process segment.
      */
     void extract(ContextCarrier carrier);
 
     /**
-     * Capture a snapshot for cross-thread propagation.
-     * It's a similar concept with ActiveSpan.Continuation in OpenTracing-java
-     * How to build, depends on the implementation.
+     * Capture a snapshot for cross-thread propagation. It's a similar concept with ActiveSpan.Continuation in
+     * OpenTracing-java How to build, depends on the implementation.
      *
      * @return the {@link ContextSnapshot} , which includes the reference context.
      */
     ContextSnapshot capture();
 
     /**
-     * Build the reference between this segment and a cross-thread segment.
-     * How to build, depends on the implementation.
+     * Build the reference between this segment and a cross-thread segment. How to build, depends on the
+     * implementation.
      *
      * @param snapshot from {@link #capture()} in the parent thread.
      */
     void continued(ContextSnapshot snapshot);
 
     /**
-     * Get the global trace id, if needEnhance.
-     * How to build, depends on the implementation.
+     * Get the global trace id, if needEnhance. How to build, depends on the implementation.
      *
      * @return the string represents the id.
      */
@@ -105,4 +101,15 @@ public interface AbstractTracerContext {
      */
     void stopSpan(AbstractSpan span);
 
+    /**
+     * Notify this context, current span is going to be finished async in another thread.
+     */
+    AbstractTracerContext awaitFinishAsync();
+
+    /**
+     * The given span could be stopped officially.
+     *
+     * @param span to be stopped.
+     */
+    void asyncStop(AsyncSpan span);
 }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AsyncSpan.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AsyncSpan.java
new file mode 100644
index 0000000..f667f59
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AsyncSpan.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.context;
+
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+
+/**
+ * Span could use these APIs to active and extend its lift cycle across thread.
+ *
+ * This is typical used in async plugin, especially RPC plugins.
+ *
+ * @author wusheng
+ */
+public interface AsyncSpan {
+    /**
+     * The span finish at current tracing context, but the current span is still alive, until {@link #asyncFinish}
+     * called.
+     *
+     * This method must be called<br/>
+     * 1. In original thread(tracing context).
+     * 2. Current span is active span.
+     *
+     * During alive, tags, logs and attributes of the span could be changed, in any thread.
+     *
+     * The execution times of {@link #prepareForAsync} and {@link #asyncFinish()} must match.
+     *
+     * @return the current span
+     */
+    AbstractSpan prepareForAsync();
+
+    /**
+     * Notify the span, it could be finished.
+     *
+     * The execution times of {@link #prepareForAsync} and {@link #asyncFinish()} must match.
+     *
+     * @return the current span
+     */
+    AbstractSpan asyncFinish();
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java
index aeaa708..378602b 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java
@@ -18,14 +18,11 @@
 
 package org.apache.skywalking.apm.agent.core.context;
 
-import org.apache.skywalking.apm.agent.core.boot.BootService;
-import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.boot.*;
 import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
-import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
-import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.context.trace.*;
 import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
-import org.apache.skywalking.apm.agent.core.logging.api.ILog;
-import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.agent.core.logging.api.*;
 import org.apache.skywalking.apm.agent.core.sampling.SamplingService;
 import org.apache.skywalking.apm.util.StringUtil;
 
@@ -59,7 +56,7 @@ public class ContextManager implements TracingContextListener, BootService, Igno
             } else {
                 if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()
                     && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()
-                    ) {
+                ) {
                     context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);
                 } else {
                     /**
@@ -152,6 +149,14 @@ public class ContextManager implements TracingContextListener, BootService, Igno
         }
     }
 
+    public static AbstractTracerContext awaitFinishAsync(AbstractSpan span) {
+        AbstractSpan activeSpan = activeSpan();
+        if (span != activeSpan) {
+            throw new RuntimeException("Span is not the active in current context.");
+        }
+        return get().awaitFinishAsync();
+    }
+
     public static AbstractSpan activeSpan() {
         return get().activeSpan();
     }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContext.java
index fd115e5..cef591e 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContext.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContext.java
@@ -95,6 +95,14 @@ public class IgnoredTracerContext implements AbstractTracerContext {
         }
     }
 
+    @Override public AbstractTracerContext awaitFinishAsync() {
+        return this;
+    }
+
+    @Override public void asyncStop(AsyncSpan span) {
+
+    }
+
     public static class ListenerManager {
         private static List<IgnoreTracerContextListener> LISTENERS = new LinkedList<IgnoreTracerContextListener>();
 
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
index df35ada..66f90cc 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
@@ -18,25 +18,14 @@
 
 package org.apache.skywalking.apm.agent.core.context;
 
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
 import org.apache.skywalking.apm.agent.core.conf.Config;
-import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
-import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
-import org.apache.skywalking.apm.agent.core.context.trace.EntrySpan;
-import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
-import org.apache.skywalking.apm.agent.core.context.trace.LocalSpan;
-import org.apache.skywalking.apm.agent.core.context.trace.NoopExitSpan;
-import org.apache.skywalking.apm.agent.core.context.trace.NoopSpan;
-import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
-import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
-import org.apache.skywalking.apm.agent.core.context.trace.WithPeerInfo;
-import org.apache.skywalking.apm.agent.core.dictionary.DictionaryManager;
-import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
-import org.apache.skywalking.apm.agent.core.dictionary.PossibleFound;
-import org.apache.skywalking.apm.agent.core.logging.api.ILog;
-import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.agent.core.context.trace.*;
+import org.apache.skywalking.apm.agent.core.dictionary.*;
+import org.apache.skywalking.apm.agent.core.logging.api.*;
 import org.apache.skywalking.apm.agent.core.sampling.SamplingService;
 import org.apache.skywalking.apm.util.StringUtil;
 
@@ -81,6 +70,13 @@ public class TracingContext implements AbstractTracerContext {
     private int spanIdGenerator;
 
     /**
+     * The counter indicates
+     */
+    private AtomicInteger asyncSpanCounter;
+    private volatile boolean isRunningInAsyncMode;
+    private ReentrantLock asyncFinishLock;
+
+    /**
      * Initialize all fields with default value.
      */
     TracingContext() {
@@ -89,6 +85,9 @@ public class TracingContext implements AbstractTracerContext {
         if (samplingService == null) {
             samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
         }
+        asyncSpanCounter = new AtomicInteger(0);
+        isRunningInAsyncMode = false;
+        asyncFinishLock = new ReentrantLock();
     }
 
     /**
@@ -407,9 +406,39 @@ public class TracingContext implements AbstractTracerContext {
             throw new IllegalStateException("Stopping the unexpected span = " + span);
         }
 
-        if (activeSpanStack.isEmpty()) {
-            this.finish();
+        if (checkFinishConditions()) {
+            finish();
+        }
+    }
+
+    @Override public AbstractTracerContext awaitFinishAsync() {
+        isRunningInAsyncMode = true;
+        asyncSpanCounter.addAndGet(1);
+        return this;
+    }
+
+    @Override public void asyncStop(AsyncSpan span) {
+        asyncSpanCounter.addAndGet(-1);
+
+        if (checkFinishConditions()) {
+            finish();
+        }
+    }
+
+    private boolean checkFinishConditions() {
+        if (isRunningInAsyncMode) {
+            asyncFinishLock.lock();
+        }
+        try {
+            if (activeSpanStack.isEmpty() && asyncSpanCounter.get() == 0) {
+                return true;
+            }
+        } finally {
+            if (isRunningInAsyncMode) {
+                asyncFinishLock.unlock();
+            }
         }
+        return false;
     }
 
     /**
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractSpan.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractSpan.java
index 17e8aef..0085896 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractSpan.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractSpan.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.agent.core.context.trace;
 
 import java.util.Map;
+import org.apache.skywalking.apm.agent.core.context.AsyncSpan;
 import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
 import org.apache.skywalking.apm.network.trace.component.Component;
 import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
@@ -28,7 +29,7 @@ import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
  *
  * @author wusheng
  */
-public interface AbstractSpan {
+public interface AbstractSpan extends AsyncSpan {
     /**
      * Set the component id, which defines in {@link ComponentsDefine}
      *
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
index 803ce1b..e05e845 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
@@ -18,15 +18,10 @@
 
 package org.apache.skywalking.apm.agent.core.context.trace;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
-import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
-import org.apache.skywalking.apm.agent.core.context.util.KeyValuePair;
-import org.apache.skywalking.apm.agent.core.context.util.TagValuePair;
-import org.apache.skywalking.apm.agent.core.context.util.ThrowableTransformer;
+import java.util.*;
+import org.apache.skywalking.apm.agent.core.context.*;
+import org.apache.skywalking.apm.agent.core.context.tag.*;
+import org.apache.skywalking.apm.agent.core.context.util.*;
 import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
 import org.apache.skywalking.apm.network.language.agent.SpanType;
 import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
@@ -45,6 +40,9 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
     protected String operationName;
     protected int operationId;
     protected SpanLayer layer;
+    protected boolean isInAsyncMode = false;
+    protected volatile AbstractTracerContext context;
+
     /**
      * The start time of this Span.
      */
@@ -322,4 +320,19 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
             refs.add(ref);
         }
     }
+
+    @Override public AbstractSpan prepareForAsync() {
+        context = ContextManager.awaitFinishAsync(this);
+        return this;
+    }
+
+    @Override public AbstractSpan asyncFinish() {
+        if (!isInAsyncMode) {
+            throw new RuntimeException("Span is not in async mode, please use '#prepareForAsync' to active.");
+        }
+
+        this.endTime = System.currentTimeMillis();
+        context.asyncStop(this);
+        return this;
+    }
 }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/NoopSpan.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/NoopSpan.java
index 774ebca..d91a813 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/NoopSpan.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/NoopSpan.java
@@ -115,4 +115,12 @@ public class NoopSpan implements AbstractSpan {
     @Override public AbstractSpan setPeer(String remotePeer) {
         return this;
     }
+
+    @Override public AbstractSpan prepareForAsync() {
+        return this;
+    }
+
+    @Override public AbstractSpan asyncFinish() {
+        return this;
+    }
 }
diff --git a/docs/en/guides/Java-Plugin-Development-Guide.md b/docs/en/guides/Java-Plugin-Development-Guide.md
index fd6ed45..9927707 100644
--- a/docs/en/guides/Java-Plugin-Development-Guide.md
+++ b/docs/en/guides/Java-Plugin-Development-Guide.md
@@ -160,6 +160,42 @@ SpanLayer is the catalog of span. Here are 5 values:
 Component IDs are defined and reserved by SkyWalking project.
 For component name/ID extension, please follow [cComponent library definition and extension](Component-library-settings.md) document.
 
+### Advanced APIs
+#### Async Span APIs
+There is a set of advanced APIs in Span, which work specific for async scenario. When tags, logs, attributes(including end time) of the span
+needs to set in another thread, you should use these APIs.
+
+```java
+    /**
+     * The span finish at current tracing context, but the current span is still alive, until {@link #asyncFinish}
+     * called.
+     *
+     * This method must be called<br/>
+     * 1. In original thread(tracing context).
+     * 2. Current span is active span.
+     *
+     * During alive, tags, logs and attributes of the span could be changed, in any thread.
+     *
+     * The execution times of {@link #prepareForAsync} and {@link #asyncFinish()} must match.
+     *
+     * @return the current span
+     */
+    AbstractSpan prepareForAsync();
+
+    /**
+     * Notify the span, it could be finished.
+     *
+     * The execution times of {@link #prepareForAsync} and {@link #asyncFinish()} must match.
+     *
+     * @return the current span
+     */
+    AbstractSpan asyncFinish();
+```
+1. Call `#prepareForAsync` in original context.
+1. Propagate the span to any other thread.
+1. After all set, call `#asyncFinish` in any thread.
+1. Tracing context will be finished and report to backend when all spans's `#prepareForAsync` finished(Judged by count of API execution).
+
 ## Develop a plugin
 ### Abstract
 The basic method to trace is intercepting a Java method, by using byte code manipulation tech and AOP concept.