You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2020/03/26 01:02:30 UTC

[skywalking] branch master updated: Correlation protocol implement (#4555)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 569baf3  Correlation protocol implement (#4555)
569baf3 is described below

commit 569baf3236226b17c3b40e376e5681b5623abf38
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Mar 26 09:02:14 2020 +0800

    Correlation protocol implement (#4555)
    
    Co-authored-by: Mrproliu <mr...@lagou.com>
    Co-authored-by: 吴晟 Wu Sheng <wu...@foxmail.com>
    Co-authored-by: kezhenxu94 <ke...@apache.org>
---
 .../skywalking/apm/toolkit/trace/TraceContext.java |  21 +++
 .../skywalking/apm/agent/core/conf/Config.java     |  12 ++
 .../agent/core/context/AbstractTracerContext.java  |   4 +
 .../apm/agent/core/context/ContextCarrier.java     |   9 +-
 .../apm/agent/core/context/ContextManager.java     |   9 ++
 .../apm/agent/core/context/ContextSnapshot.java    |   9 +-
 .../apm/agent/core/context/CorrelationContext.java | 164 +++++++++++++++++++++
 .../agent/core/context/IgnoredTracerContext.java   |  16 +-
 .../core/context/SW7CorrelationCarrierItem.java    |  27 ++--
 .../apm/agent/core/context/TracingContext.java     |  16 +-
 .../core/context/ContextCarrierV2HeaderTest.java   |  53 ++++++-
 .../agent/core/context/CorrelationContextTest.java | 124 ++++++++++++++++
 .../apm/plugin/finagle/CodecUtilsTest.java         |   5 +-
 .../v4/HttpClientExecuteInterceptorTest.java       |   9 +-
 .../v4/HttpAsyncClientInterceptorTest.java         |   2 +-
 .../plugin/motan/MotanConsumerInterceptorTest.java |   5 +-
 .../ProducerOperationHandlerInterceptorTest.java   |   3 +-
 .../TransportClientHandlerInterceptorTest.java     |   3 +-
 .../ProducerOperationHandlerInterceptorTest.java   |   3 +-
 .../v1/TransportClientHandlerInterceptorTest.java  |   3 +-
 .../agent/core/context/MockContextSnapshot.java    |   2 +-
 .../trace/CorrelationContextGetInterceptor.java    |  45 ++++++
 .../trace/CorrelationContextPutInterceptor.java    |  46 ++++++
 .../activation/trace/TraceContextActivation.java   |  36 +++++
 ...ross-Process-Correlation-Headers-Protocol-v1.md |   2 +-
 .../java-agent/Application-toolkit-trace.md        |  11 ++
 docs/en/setup/service-agent/java-agent/README.md   |   4 +-
 .../config/expectedData.yaml                       |   3 +
 .../skywalking/apm/toolkit/trace/TraceContext.java |  21 +++
 .../toolkit/controller/TestController.java         |  10 ++
 30 files changed, 634 insertions(+), 43 deletions(-)

diff --git a/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/TraceContext.java b/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/TraceContext.java
index fb9a66e..c846f44 100644
--- a/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/TraceContext.java
+++ b/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/TraceContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.skywalking.apm.toolkit.trace;
 
+import java.util.Optional;
+
 /**
  * Try to access the sky-walking tracer context. The context is not existed, always. only the middleware, component, or
  * rpc-framework are supported in the current invoke stack, in the same thread, the context will be available.
@@ -33,4 +35,23 @@ public class TraceContext {
     public static String traceId() {
         return "";
     }
+
+    /**
+     * Try to get the custom value from trace context.
+     *
+     * @return custom data value.
+     */
+    public static Optional<String> getCorrelation(String key) {
+        return Optional.empty();
+    }
+
+    /**
+     * Put the custom key/value into trace context.
+     *
+     * @return previous value if it exists.
+     */
+    public static Optional<String> putCorrelation(String key, String value) {
+        return Optional.empty();
+    }
+
 }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index ee4f87e..e518175 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -394,4 +394,16 @@ public class Config {
             public static int HTTP_PARAMS_LENGTH_THRESHOLD = 1024;
         }
     }
+
+    public static class Correlation {
+        /**
+         * Max element count in the correlation context.
+         */
+        public static int ELEMENT_MAX_NUMBER = 3;
+
+        /**
+         * Max value length of each element.
+         */
+        public static int VALUE_MAX_LENGTH = 128;
+    }
 }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
index 4fb0999..9af1d1b 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
@@ -115,4 +115,8 @@ public interface AbstractTracerContext {
      */
     void asyncStop(AsyncSpan span);
 
+    /**
+     * Get current correlation context
+     */
+    CorrelationContext getCorrelationContext();
 }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java
index fd3e079..d6281af 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java
@@ -70,8 +70,11 @@ public class ContextCarrier implements Serializable {
      */
     private DistributedTraceId primaryDistributedTraceId;
 
+    private CorrelationContext correlationContext = new CorrelationContext();
+
     public CarrierItem items() {
-        SW6CarrierItem sw6CarrierItem = new SW6CarrierItem(this, null);
+        SW7CorrelationCarrierItem sw7CorrelationCarrierItem = new SW7CorrelationCarrierItem(correlationContext, null);
+        SW6CarrierItem sw6CarrierItem = new SW6CarrierItem(this, sw7CorrelationCarrierItem);
         return new CarrierItemHead(sw6CarrierItem);
     }
 
@@ -234,6 +237,10 @@ public class ContextCarrier implements Serializable {
         this.entryServiceInstanceId = entryServiceInstanceId;
     }
 
+    public CorrelationContext getCorrelationContext() {
+        return correlationContext;
+    }
+
     public enum HeaderVersion {
         v2
     }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java
index 0a5a96d..5608cde 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java
@@ -225,4 +225,13 @@ public class ContextManager implements BootService {
         return runtimeContext;
     }
 
+    public static CorrelationContext getCorrelationContext() {
+        final AbstractTracerContext tracerContext = get();
+        if (tracerContext == null) {
+            return null;
+        }
+
+        return tracerContext.getCorrelationContext();
+    }
+
 }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java
index ec8d08d..5796763 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java
@@ -49,12 +49,15 @@ public class ContextSnapshot {
 
     private int entryApplicationInstanceId = DictionaryUtil.nullValue();
 
-    ContextSnapshot(ID traceSegmentId, int spanId, List<DistributedTraceId> distributedTraceIds) {
+    private CorrelationContext correlationContext;
+
+    ContextSnapshot(ID traceSegmentId, int spanId, List<DistributedTraceId> distributedTraceIds, CorrelationContext correlationContext) {
         this.traceSegmentId = traceSegmentId;
         this.spanId = spanId;
         if (distributedTraceIds != null) {
             this.primaryDistributedTraceId = distributedTraceIds.get(0);
         }
+        this.correlationContext = correlationContext.clone();
     }
 
     public void setEntryOperationName(String entryOperationName) {
@@ -108,4 +111,8 @@ public class ContextSnapshot {
     public boolean isFromCurrent() {
         return traceSegmentId.equals(ContextManager.capture().getTraceSegmentId());
     }
+
+    public CorrelationContext getCorrelationContext() {
+        return correlationContext;
+    }
 }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java
new file mode 100644
index 0000000..5fbc56e
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.agent.core.context;
+
+import org.apache.skywalking.apm.agent.core.base64.Base64;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.util.StringUtil;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Correlation context, use to propagation user custom data.
+ * Working on the protocol and delegate set/get method.
+ */
+public class CorrelationContext {
+
+    private final Map<String, String> data;
+
+    public CorrelationContext() {
+        this.data = new HashMap<>(Config.Correlation.ELEMENT_MAX_NUMBER);
+    }
+
+    public Optional<String> put(String key, String value) {
+        // key must not null
+        if (key == null) {
+            return Optional.empty();
+        }
+
+        // remove and return previous value when value is empty
+        if (StringUtil.isEmpty(value)) {
+            return Optional.ofNullable(data.remove(key));
+        }
+
+        // check value length
+        if (value.length() > Config.Correlation.VALUE_MAX_LENGTH) {
+            return Optional.empty();
+        }
+
+        // already contain key
+        if (data.containsKey(key)) {
+            final String previousValue = data.put(key, value);
+            return Optional.of(previousValue);
+        }
+
+        // check keys count
+        if (data.size() >= Config.Correlation.ELEMENT_MAX_NUMBER) {
+            return Optional.empty();
+        }
+
+        // setting
+        data.put(key, value);
+        return Optional.empty();
+    }
+
+    public Optional<String> get(String key) {
+        if (key == null) {
+            return Optional.empty();
+        }
+
+        return Optional.ofNullable(data.get(key));
+    }
+
+    /**
+     * Serialize this {@link CorrelationContext} to a {@link String}
+     *
+     * @return the serialization string.
+     */
+    String serialize() {
+        if (data.isEmpty()) {
+            return "";
+        }
+
+        return data.entrySet().stream()
+            .map(entry -> Base64.encode(entry.getKey()) + ":" + Base64.encode(entry.getValue()))
+            .collect(Collectors.joining(","));
+    }
+
+    /**
+     * Deserialize data from {@link String}
+     */
+    void deserialize(String value) {
+        if (StringUtil.isEmpty(value)) {
+            return;
+        }
+
+        for (String perData : value.split(",")) {
+            // Only data with limited count of elements can be added
+            if (data.size() >= Config.Correlation.ELEMENT_MAX_NUMBER) {
+                break;
+            }
+            final String[] parts = perData.split(":");
+            String perDataKey = parts[0];
+            String perDataValue = parts.length > 1 ? parts[1] : "";
+            data.put(Base64.decode2UTFString(perDataKey), Base64.decode2UTFString(perDataValue));
+        }
+    }
+
+    /**
+     * Prepare for the cross-process propagation. Inject the {@link #data} into {@link ContextCarrier#getCorrelationContext()}
+     */
+    void inject(ContextCarrier carrier) {
+        carrier.getCorrelationContext().data.putAll(this.data);
+    }
+
+    /**
+     * Extra the {@link ContextCarrier#getCorrelationContext()} into this context.
+     */
+    void extract(ContextCarrier carrier) {
+        final Map<String, String> carrierCorrelationContext = carrier.getCorrelationContext().data;
+        for (Map.Entry<String, String> entry : carrierCorrelationContext.entrySet()) {
+            // Only data with limited count of elements can be added
+            if (data.size() >= Config.Correlation.ELEMENT_MAX_NUMBER) {
+                break;
+            }
+
+            this.data.put(entry.getKey(), entry.getValue());
+        }
+    }
+
+    /**
+     * Clone the context data, work for capture to cross-thread.
+     */
+    public CorrelationContext clone() {
+        final CorrelationContext context = new CorrelationContext();
+        context.data.putAll(this.data);
+        return context;
+    }
+
+    void continued(ContextSnapshot snapshot) {
+        this.data.putAll(snapshot.getCorrelationContext().data);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CorrelationContext that = (CorrelationContext) o;
+        return Objects.equals(data, that.data);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(data);
+    }
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContext.java
index 1a5d8c6..f9aeb4e 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContext.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContext.java
@@ -32,30 +32,33 @@ import org.apache.skywalking.apm.agent.core.context.trace.NoopSpan;
 public class IgnoredTracerContext implements AbstractTracerContext {
     private static final NoopSpan NOOP_SPAN = new NoopSpan();
 
+    private final CorrelationContext correlationContext;
+
     private int stackDepth;
 
     public IgnoredTracerContext() {
         this.stackDepth = 0;
+        this.correlationContext = new CorrelationContext();
     }
 
     @Override
     public void inject(ContextCarrier carrier) {
-
+        this.correlationContext.inject(carrier);
     }
 
     @Override
     public void extract(ContextCarrier carrier) {
-
+        this.correlationContext.extract(carrier);
     }
 
     @Override
     public ContextSnapshot capture() {
-        return new ContextSnapshot(null, -1, null);
+        return new ContextSnapshot(null, -1, null, correlationContext);
     }
 
     @Override
     public void continued(ContextSnapshot snapshot) {
-
+        this.correlationContext.continued(snapshot);
     }
 
     @Override
@@ -105,6 +108,11 @@ public class IgnoredTracerContext implements AbstractTracerContext {
 
     }
 
+    @Override
+    public CorrelationContext getCorrelationContext() {
+        return this.correlationContext;
+    }
+
     public static class ListenerManager {
         private static List<IgnoreTracerContextListener> LISTENERS = new LinkedList<>();
 
diff --git a/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/TraceContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW7CorrelationCarrierItem.java
similarity index 58%
copy from apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/TraceContext.java
copy to apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW7CorrelationCarrierItem.java
index fb9a66e..5566233 100644
--- a/apm-application-toolkit/apm-toolkit-trace/src/main/java/org/apache/skywalking/apm/toolkit/trace/TraceContext.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW7CorrelationCarrierItem.java
@@ -13,24 +13,21 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
-package org.apache.skywalking.apm.toolkit.trace;
+package org.apache.skywalking.apm.agent.core.context;
 
-/**
- * Try to access the sky-walking tracer context. The context is not existed, always. only the middleware, component, or
- * rpc-framework are supported in the current invoke stack, in the same thread, the context will be available.
- * <p>
- */
-public class TraceContext {
+public class SW7CorrelationCarrierItem extends CarrierItem {
+    public static final String HEADER_NAME = "sw7-correlation";
+    private final CorrelationContext correlationContext;
+
+    public SW7CorrelationCarrierItem(CorrelationContext correlationContext, CarrierItem next) {
+        super(HEADER_NAME, correlationContext.serialize(), next);
+        this.correlationContext = correlationContext;
+    }
 
-    /**
-     * Try to get the traceId of current trace context.
-     *
-     * @return traceId, if it exists, or empty {@link String}.
-     */
-    public static String traceId() {
-        return "";
+    @Override
+    public void setHeadValue(String headValue) {
+        this.correlationContext.deserialize(headValue);
     }
 }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
index a31145e..509d98f 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
@@ -111,6 +111,8 @@ public class TracingContext implements AbstractTracerContext {
      */
     private final ProfileStatusReference profileStatus;
 
+    private final CorrelationContext correlationContext;
+
     /**
      * Initialize all fields with default value.
      */
@@ -130,6 +132,8 @@ public class TracingContext implements AbstractTracerContext {
             PROFILE_TASK_EXECUTION_SERVICE = ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
         }
         this.profileStatus = PROFILE_TASK_EXECUTION_SERVICE.addProfiling(this, segment.getTraceSegmentId(), firstOPName);
+
+        this.correlationContext = new CorrelationContext();
     }
 
     /**
@@ -232,6 +236,8 @@ public class TracingContext implements AbstractTracerContext {
         }
 
         carrier.setDistributedTraceIds(this.segment.getRelatedGlobalTraces());
+
+        this.correlationContext.inject(carrier);
     }
 
     /**
@@ -248,6 +254,8 @@ public class TracingContext implements AbstractTracerContext {
         if (span instanceof EntrySpan) {
             span.ref(ref);
         }
+
+        this.correlationContext.extract(carrier);
     }
 
     /**
@@ -259,7 +267,7 @@ public class TracingContext implements AbstractTracerContext {
     public ContextSnapshot capture() {
         List<TraceSegmentRef> refs = this.segment.getRefs();
         ContextSnapshot snapshot = new ContextSnapshot(
-            segment.getTraceSegmentId(), activeSpan().getSpanId(), segment.getRelatedGlobalTraces());
+            segment.getTraceSegmentId(), activeSpan().getSpanId(), segment.getRelatedGlobalTraces(), this.correlationContext);
         int entryOperationId;
         String entryOperationName = "";
         int entryApplicationInstanceId;
@@ -327,6 +335,7 @@ public class TracingContext implements AbstractTracerContext {
         this.segment.ref(segmentRef);
         this.activeSpan().ref(segmentRef);
         this.segment.relatedGlobalTraces(snapshot.getDistributedTraceId());
+        this.correlationContext.continued(snapshot);
     }
 
     /**
@@ -510,6 +519,11 @@ public class TracingContext implements AbstractTracerContext {
         finish();
     }
 
+    @Override
+    public CorrelationContext getCorrelationContext() {
+        return this.correlationContext;
+    }
+
     /**
      * Re-check current trace need profiling, encase third part plugin change the operation name.
      *
diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV2HeaderTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV2HeaderTest.java
index 9ad3b77..8bd53df 100644
--- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV2HeaderTest.java
+++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV2HeaderTest.java
@@ -34,7 +34,13 @@ public class ContextCarrierV2HeaderTest {
         CarrierItem next = contextCarrier.items();
         while (next.hasNext()) {
             next = next.next();
-            next.setHeadValue("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw--");
+            if (next.getHeadKey().equals(SW6CarrierItem.HEADER_NAME)) {
+                next.setHeadValue("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw--");
+            } else if (next.getHeadKey().equals(SW7CorrelationCarrierItem.HEADER_NAME)) {
+                next.setHeadValue("dGVzdA==:dHJ1ZQ==");
+            } else {
+                throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
+            }
         }
 
         Assert.assertTrue(contextCarrier.isValid());
@@ -55,6 +61,8 @@ public class ContextCarrierV2HeaderTest {
         contextCarrier.setEntryEndpointName("/portal");
         contextCarrier.setParentEndpointId(123);
 
+        contextCarrier.getCorrelationContext().put("test", "true");
+
         CarrierItem next = contextCarrier.items();
         while (next.hasNext()) {
             next = next.next();
@@ -63,13 +71,30 @@ public class ContextCarrierV2HeaderTest {
              *
              * "1-3.4.5-1.2.3-4-1-1-#127.0.0.1:8080-#/portal-123"
              */
-            Assert.assertEquals("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw-Iy9wb3J0YWw=-MTIz", next.getHeadValue());
+            if (next.getHeadKey().equals(SW6CarrierItem.HEADER_NAME)) {
+                Assert.assertEquals("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw-Iy9wb3J0YWw=-MTIz", next.getHeadValue());
+            } else if (next.getHeadKey().equals(SW7CorrelationCarrierItem.HEADER_NAME)) {
+                /**
+                 * customKey:customValue
+                 *
+                 * "test:true"
+                 */
+                Assert.assertEquals("dGVzdA==:dHJ1ZQ==", next.getHeadValue());
+            } else {
+                throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
+            }
         }
 
         next = contextCarrier.items();
         while (next.hasNext()) {
             next = next.next();
-            Assert.assertEquals("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw-Iy9wb3J0YWw=-MTIz", next.getHeadValue());
+            if (next.getHeadKey().equals(SW6CarrierItem.HEADER_NAME)) {
+                Assert.assertEquals("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw-Iy9wb3J0YWw=-MTIz", next.getHeadValue());
+            } else if (next.getHeadKey().equals(SW7CorrelationCarrierItem.HEADER_NAME)) {
+                Assert.assertEquals("dGVzdA==:dHJ1ZQ==", next.getHeadValue());
+            } else {
+                throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
+            }
         }
 
         Assert.assertTrue(contextCarrier.isValid());
@@ -90,18 +115,33 @@ public class ContextCarrierV2HeaderTest {
         contextCarrier.setEntryEndpointName("/portal");
         contextCarrier.setParentEndpointId(123);
 
+        contextCarrier.getCorrelationContext().put("test", "true");
+
         CarrierItem next = contextCarrier.items();
-        String headerValue = null;
+        String sw6HeaderValue = null;
+        String correlationHeaderValue = null;
         while (next.hasNext()) {
             next = next.next();
-            headerValue = next.getHeadValue();
+            if (next.getHeadKey().equals(SW6CarrierItem.HEADER_NAME)) {
+                sw6HeaderValue = next.getHeadValue();
+            } else if (next.getHeadKey().equals(SW7CorrelationCarrierItem.HEADER_NAME)) {
+                correlationHeaderValue = next.getHeadValue();
+            } else {
+                throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
+            }
         }
 
         ContextCarrier contextCarrier2 = new ContextCarrier();
         next = contextCarrier2.items();
         while (next.hasNext()) {
             next = next.next();
-            next.setHeadValue(headerValue);
+            if (next.getHeadKey().equals(SW6CarrierItem.HEADER_NAME)) {
+                next.setHeadValue(sw6HeaderValue);
+            } else if (next.getHeadKey().equals(SW7CorrelationCarrierItem.HEADER_NAME)) {
+                next.setHeadValue(correlationHeaderValue);
+            } else {
+                throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
+            }
         }
 
         Assert.assertTrue(contextCarrier2.isValid());
@@ -112,5 +152,6 @@ public class ContextCarrierV2HeaderTest {
         Assert.assertEquals(contextCarrier.getEntryEndpointName(), contextCarrier2.getEntryEndpointName());
         Assert.assertEquals(contextCarrier.getEntryServiceInstanceId(), contextCarrier2.getEntryServiceInstanceId());
         Assert.assertEquals(contextCarrier.getParentServiceInstanceId(), contextCarrier2.getParentServiceInstanceId());
+        Assert.assertEquals(contextCarrier.getCorrelationContext(), contextCarrier2.getCorrelationContext());
     }
 }
diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/CorrelationContextTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/CorrelationContextTest.java
new file mode 100644
index 0000000..11db7b3
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/CorrelationContextTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.context;
+
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Optional;
+
+public class CorrelationContextTest {
+
+    @Before
+    public void setupConfig() {
+        Config.Correlation.ELEMENT_MAX_NUMBER = 2;
+        Config.Correlation.VALUE_MAX_LENGTH = 8;
+    }
+
+    @Test
+    public void testSet() {
+        final CorrelationContext context = new CorrelationContext();
+
+        // manual set
+        Optional<String> previous = context.put("test1", "t1");
+        Assert.assertNotNull(previous);
+        Assert.assertFalse(previous.isPresent());
+
+        // set with replace old value
+        previous = context.put("test1", "t1New");
+        Assert.assertNotNull(previous);
+        Assert.assertEquals("t1", previous.get());
+
+        // manual set
+        previous = context.put("test2", "t2");
+        Assert.assertNotNull(previous);
+        Assert.assertFalse(previous.isPresent());
+
+        // out of key count
+        previous = context.put("test3", "t3");
+        Assert.assertNotNull(previous);
+        Assert.assertFalse(previous.isPresent());
+
+        // key not null
+        previous = context.put(null, "t3");
+        Assert.assertNotNull(previous);
+        Assert.assertFalse(previous.isPresent());
+
+        // out of value length
+        previous = context.put(null, "123456789");
+        Assert.assertNotNull(previous);
+        Assert.assertFalse(previous.isPresent());
+    }
+
+    @Test
+    public void testGet() {
+        final CorrelationContext context = new CorrelationContext();
+        context.put("test1", "t1");
+
+        // manual get
+        Assert.assertEquals("t1", context.get("test1").get());
+        // ket if null
+        Assert.assertNull(context.get(null).orElse(null));
+        // value if null
+        context.put("test2", null);
+        Assert.assertNull(context.get("test2").orElse(null));
+    }
+
+    @Test
+    public void testSerialize() {
+        // manual
+        CorrelationContext context = new CorrelationContext();
+        context.put("test1", "t1");
+        context.put("test2", "t2");
+        Assert.assertEquals("dGVzdDE=:dDE=,dGVzdDI=:dDI=", context.serialize());
+
+        // empty value
+        context = new CorrelationContext();
+        context.put("test1", null);
+        Assert.assertEquals("", context.serialize());
+
+        // empty
+        context = new CorrelationContext();
+        Assert.assertEquals("", context.serialize());
+    }
+
+    @Test
+    public void testDeserialize() {
+        // manual
+        CorrelationContext context = new CorrelationContext();
+        context.deserialize("dGVzdDE=:dDE=,dGVzdDI=:dDI=");
+        Assert.assertEquals("t1", context.get("test1").get());
+        Assert.assertEquals("t2", context.get("test2").get());
+
+        // empty value
+        context = new CorrelationContext();
+        context.deserialize("dGVzdDE=:");
+        Assert.assertEquals("", context.get("test1").get());
+
+        // empty string
+        context = new CorrelationContext();
+        context.deserialize("");
+        Assert.assertNull(context.get("test1").orElse(null));
+        context.deserialize(null);
+        Assert.assertNull(context.get("test1").orElse(null));
+    }
+
+}
diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java
index 4d4f1ab..0b8454d 100644
--- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java
+++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.plugin.finagle;
 import com.twitter.io.Bufs;
 import org.apache.skywalking.apm.agent.core.context.CarrierItem;
 import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.SW6CarrierItem;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -53,7 +54,9 @@ public class CodecUtilsTest {
         CarrierItem next = contextCarrier.items();
         while (next.hasNext()) {
             next = next.next();
-            next.setHeadValue(UUID.randomUUID().toString());
+            if (next.getHeadKey().equals(SW6CarrierItem.HEADER_NAME)) {
+                next.setHeadValue(UUID.randomUUID().toString());
+            }
         }
         SWContextCarrier swContextCarrier = new SWContextCarrier();
         swContextCarrier.setContextCarrier(contextCarrier);
diff --git a/apm-sniffer/apm-sdk-plugin/httpClient-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/httpClient/v4/HttpClientExecuteInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/httpClient-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/httpClient/v4/HttpClientExecuteInterceptorTest.java
index 09b6323..9f491cf 100644
--- a/apm-sniffer/apm-sdk-plugin/httpClient-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/httpClient/v4/HttpClientExecuteInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/httpClient-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/httpClient/v4/HttpClientExecuteInterceptorTest.java
@@ -53,6 +53,7 @@ import static junit.framework.TestCase.assertNotNull;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -132,7 +133,7 @@ public class HttpClientExecuteInterceptorTest {
 
         List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
         assertHttpSpan(spans.get(0));
-        verify(request).setHeader(anyString(), anyString());
+        verify(request, times(2)).setHeader(anyString(), anyString());
     }
 
     @Test
@@ -153,7 +154,7 @@ public class HttpClientExecuteInterceptorTest {
 
         assertHttpSpan(spans.get(0));
         assertThat(SpanHelper.getErrorOccurred(spans.get(0)), is(true));
-        verify(request).setHeader(anyString(), anyString());
+        verify(request, times(2)).setHeader(anyString(), anyString());
     }
 
     @Test
@@ -171,7 +172,7 @@ public class HttpClientExecuteInterceptorTest {
         assertHttpSpan(span);
         assertThat(SpanHelper.getErrorOccurred(span), is(true));
         assertHttpSpanErrorLog(SpanHelper.getLogs(span));
-        verify(request).setHeader(anyString(), anyString());
+        verify(request, times(2)).setHeader(anyString(), anyString());
 
     }
 
@@ -201,7 +202,7 @@ public class HttpClientExecuteInterceptorTest {
 
         List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
         assertHttpSpan(spans.get(0));
-        verify(request).setHeader(anyString(), anyString());
+        verify(request, times(2)).setHeader(anyString(), anyString());
     }
 
     private void assertHttpSpanErrorLog(List<LogDataEntity> logs) {
diff --git a/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncClientInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncClientInterceptorTest.java
index fd262c4..15032d8 100644
--- a/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncClientInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncClientInterceptorTest.java
@@ -191,7 +191,7 @@ public class HttpAsyncClientInterceptorTest {
 
         List<AbstractTracingSpan> spans = SegmentHelper.getSpans(findNeedSegemnt());
         assertHttpSpan(spans.get(0));
-        verify(requestWrapper).setHeader(anyString(), anyString());
+        verify(requestWrapper, times(2)).setHeader(anyString(), anyString());
 
     }
 
diff --git a/apm-sniffer/apm-sdk-plugin/motan-plugin/src/test/java/org/apache/skywalking/apm/plugin/motan/MotanConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/motan-plugin/src/test/java/org/apache/skywalking/apm/plugin/motan/MotanConsumerInterceptorTest.java
index 6867b7c..11d2b83 100644
--- a/apm-sniffer/apm-sdk-plugin/motan-plugin/src/test/java/org/apache/skywalking/apm/plugin/motan/MotanConsumerInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/motan-plugin/src/test/java/org/apache/skywalking/apm/plugin/motan/MotanConsumerInterceptorTest.java
@@ -47,6 +47,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.apache.skywalking.apm.agent.test.tools.SpanAssert.assertComponent;
@@ -92,7 +93,7 @@ public class MotanConsumerInterceptorTest {
         TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
         List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
         assertMotanConsumerSpan(spans.get(0));
-        verify(request).setAttachment(anyString(), anyString());
+        verify(request, times(2)).setAttachment(anyString(), anyString());
     }
 
     @Test
@@ -110,7 +111,7 @@ public class MotanConsumerInterceptorTest {
 
     private void assertTraceSegmentWhenOccurException(AbstractTracingSpan tracingSpan) {
         assertMotanConsumerSpan(tracingSpan);
-        verify(request).setAttachment(anyString(), anyString());
+        verify(request, times(2)).setAttachment(anyString(), anyString());
         List<LogDataEntity> logDataEntities = SpanHelper.getLogs(tracingSpan);
         assertThat(logDataEntities.size(), is(1));
         SpanAssert.assertException(logDataEntities.get(0), RuntimeException.class);
diff --git a/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/ProducerOperationHandlerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/ProducerOperationHandlerInterceptorTest.java
index 23009a4..82cb9e7 100644
--- a/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/ProducerOperationHandlerInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/ProducerOperationHandlerInterceptorTest.java
@@ -51,6 +51,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -118,7 +119,7 @@ public class ProducerOperationHandlerInterceptorTest {
 
         List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
         assertCombSpan(spans.get(0));
-        verify(invocation).getContext();
+        verify(invocation, times(2)).getContext();
     }
 
     private void assertCombSpan(AbstractTracingSpan span) {
diff --git a/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/TransportClientHandlerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/TransportClientHandlerInterceptorTest.java
index f0357c5..a7c0aa0 100644
--- a/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/TransportClientHandlerInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/TransportClientHandlerInterceptorTest.java
@@ -51,6 +51,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -113,7 +114,7 @@ public class TransportClientHandlerInterceptorTest {
         TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
         List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
         assertCombSpan(spans.get(0));
-        verify(invocation).getContext();
+        verify(invocation, times(2)).getContext();
     }
 
     private void assertCombSpan(AbstractTracingSpan span) {
diff --git a/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-1.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/v1/ProducerOperationHandlerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-1.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/v1/ProducerOperationHandlerInterceptorTest.java
index c8e96a6..d3147c2 100644
--- a/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-1.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/v1/ProducerOperationHandlerInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-1.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/v1/ProducerOperationHandlerInterceptorTest.java
@@ -51,6 +51,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -118,7 +119,7 @@ public class ProducerOperationHandlerInterceptorTest {
 
         List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
         assertCombSpan(spans.get(0));
-        verify(invocation).getContext();
+        verify(invocation, times(2)).getContext();
     }
 
     private void assertCombSpan(AbstractTracingSpan span) {
diff --git a/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-1.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/v1/TransportClientHandlerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-1.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/v1/TransportClientHandlerInterceptorTest.java
index 54e6927..fac1161 100644
--- a/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-1.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/v1/TransportClientHandlerInterceptorTest.java
+++ b/apm-sniffer/apm-sdk-plugin/servicecomb-plugin/servicecomb-java-chassis-1.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/servicecomb/v1/TransportClientHandlerInterceptorTest.java
@@ -51,6 +51,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -113,7 +114,7 @@ public class TransportClientHandlerInterceptorTest {
         TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
         List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
         assertCombSpan(spans.get(0));
-        verify(invocation).getContext();
+        verify(invocation, times(2)).getContext();
     }
 
     private void assertCombSpan(AbstractTracingSpan span) {
diff --git a/apm-sniffer/apm-test-tools/src/main/java/org/apache/skywalking/apm/agent/core/context/MockContextSnapshot.java b/apm-sniffer/apm-test-tools/src/main/java/org/apache/skywalking/apm/agent/core/context/MockContextSnapshot.java
index afc4f8a..e725999 100644
--- a/apm-sniffer/apm-test-tools/src/main/java/org/apache/skywalking/apm/agent/core/context/MockContextSnapshot.java
+++ b/apm-sniffer/apm-test-tools/src/main/java/org/apache/skywalking/apm/agent/core/context/MockContextSnapshot.java
@@ -33,7 +33,7 @@ public enum MockContextSnapshot {
         List<DistributedTraceId> distributedTraceIds = new ArrayList<DistributedTraceId>();
         distributedTraceIds.add(new NewDistributedTraceId());
 
-        contextSnapshot = new ContextSnapshot(new ID(1, 2, 3), 1, distributedTraceIds);
+        contextSnapshot = new ContextSnapshot(new ID(1, 2, 3), 1, distributedTraceIds, new CorrelationContext());
         contextSnapshot.setEntryApplicationInstanceId(1);
         contextSnapshot.setEntryOperationId(0);
         contextSnapshot.setEntryOperationName("/for-test-entryOperationName");
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CorrelationContextGetInterceptor.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CorrelationContextGetInterceptor.java
new file mode 100644
index 0000000..db31f29
--- /dev/null
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CorrelationContextGetInterceptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.toolkit.activation.trace;
+
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
+
+import java.lang.reflect.Method;
+import java.util.Optional;
+
+public class CorrelationContextGetInterceptor implements StaticMethodsAroundInterceptor {
+
+    @Override
+    public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, MethodInterceptResult result) {
+        final String key = (String) allArguments[0];
+        final Optional<String> data = ContextManager.getCorrelationContext().get(key);
+
+        result.defineReturnValue(data);
+    }
+
+    @Override
+    public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, Object ret) {
+        return ret;
+    }
+
+    @Override
+    public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, Throwable t) {
+    }
+}
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CorrelationContextPutInterceptor.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CorrelationContextPutInterceptor.java
new file mode 100644
index 0000000..41f81d4
--- /dev/null
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/CorrelationContextPutInterceptor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.toolkit.activation.trace;
+
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
+
+import java.lang.reflect.Method;
+import java.util.Optional;
+
+public class CorrelationContextPutInterceptor implements StaticMethodsAroundInterceptor {
+
+    @Override
+    public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, MethodInterceptResult result) {
+        final String key = (String) allArguments[0];
+        final String value = (String) allArguments[1];
+        final Optional<String> previous = ContextManager.getCorrelationContext().put(key, value);
+
+        result.defineReturnValue(previous);
+    }
+
+    @Override
+    public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, Object ret) {
+        return ret;
+    }
+
+    @Override
+    public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, Throwable t) {
+    }
+}
diff --git a/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/TraceContextActivation.java b/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/TraceContextActivation.java
index 33dcc70..80c34eb 100644
--- a/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/TraceContextActivation.java
+++ b/apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/src/main/java/org/apache/skywalking/apm/toolkit/activation/trace/TraceContextActivation.java
@@ -38,6 +38,10 @@ public class TraceContextActivation extends ClassStaticMethodsEnhancePluginDefin
     public static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.TraceContextInterceptor";
     public static final String ENHANCE_CLASS = "org.apache.skywalking.apm.toolkit.trace.TraceContext";
     public static final String ENHANCE_METHOD = "traceId";
+    public static final String ENHANCE_GET_CORRELATION_METHOD = "getCorrelation";
+    public static final String INTERCEPT_GET_CORRELATION_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.CorrelationContextGetInterceptor";
+    public static final String ENHANCE_PUT_CORRELATION_METHOD = "putCorrelation";
+    public static final String INTERCEPT_PUT_CORRELATION_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.CorrelationContextPutInterceptor";
 
     /**
      * @return the target class, which needs active.
@@ -69,6 +73,38 @@ public class TraceContextActivation extends ClassStaticMethodsEnhancePluginDefin
                 public boolean isOverrideArgs() {
                     return false;
                 }
+            },
+            new StaticMethodsInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(ENHANCE_GET_CORRELATION_METHOD);
+                }
+
+                @Override
+                public String getMethodsInterceptor() {
+                    return INTERCEPT_GET_CORRELATION_CLASS;
+                }
+
+                @Override
+                public boolean isOverrideArgs() {
+                    return false;
+                }
+            },
+            new StaticMethodsInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(ENHANCE_PUT_CORRELATION_METHOD);
+                }
+
+                @Override
+                public String getMethodsInterceptor() {
+                    return INTERCEPT_PUT_CORRELATION_CLASS;
+                }
+
+                @Override
+                public boolean isOverrideArgs() {
+                    return false;
+                }
             }
         };
     }
diff --git a/docs/en/protocols/Skywalking-Cross-Process-Correlation-Headers-Protocol-v1.md b/docs/en/protocols/Skywalking-Cross-Process-Correlation-Headers-Protocol-v1.md
index 211c15a..fa126d3 100644
--- a/docs/en/protocols/Skywalking-Cross-Process-Correlation-Headers-Protocol-v1.md
+++ b/docs/en/protocols/Skywalking-Cross-Process-Correlation-Headers-Protocol-v1.md
@@ -9,7 +9,7 @@ Cross Process Correlation Header key is `sw7-correlation`. The value is the `enc
 ## Recommendations of language APIs
 Recommended implementation in different language API.
 
-1. `CorrelationContext#set` and `CorrelationContext#get` are recommended to write and read the correlation context, with key/value string.
+1. `TraceContext#putCorrelation` and `TraceContext#getCorrelation` are recommended to write and read the correlation context, with key/value string.
 1. The key should be added if it is absent.
 1. The later writes should override the previous value.
 1. The total number of all keys should be less than 3, and the length of each value should be less than 128 bytes.
diff --git a/docs/en/setup/service-agent/java-agent/Application-toolkit-trace.md b/docs/en/setup/service-agent/java-agent/Application-toolkit-trace.md
index 4b61f31..3586367 100644
--- a/docs/en/setup/service-agent/java-agent/Application-toolkit-trace.md
+++ b/docs/en/setup/service-agent/java-agent/Application-toolkit-trace.md
@@ -53,3 +53,14 @@ public User methodYouWantToTrace(String param1, String param2) {
 }
 ```
 
+* Use `TraceContext.putCorrelation()` API to put custom data in tracing context. 
+```java
+Optional<String> previous = TraceContext.putCorrelation("customKey", "customValue");
+```
+CorrelationContext will remove the item when the value is `null` or empty.
+
+* Use `TraceContext.getCorrelation()` API to get custom data.
+```java
+Optional<String> value = TraceContext.getCorrelation("customKey");
+```
+CorrelationContext configuration descriptions could be found in [the agent configuration](README.md#table-of-agent-configuration-properties) documentation, with `correlation.` as the prefix.
diff --git a/docs/en/setup/service-agent/java-agent/README.md b/docs/en/setup/service-agent/java-agent/README.md
index 135c92f..21e6ca9 100755
--- a/docs/en/setup/service-agent/java-agent/README.md
+++ b/docs/en/setup/service-agent/java-agent/README.md
@@ -125,6 +125,8 @@ property key | Description | Default |
 `plugin.tomcat.collect_http_params`| This config item controls that whether the Tomcat plugin should collect the parameters of the request. Also, activate implicitly in the profiled trace. | `false` |
 `plugin.springmvc.collect_http_params`| This config item controls that whether the SpringMVC plugin should collect the parameters of the request, when your Spring application is based on Tomcat, consider only setting either `plugin.tomcat.collect_http_params` or `plugin.springmvc.collect_http_params`. Also, activate implicitly in the profiled trace. | `false` |
 `plugin.http.http_params_length_threshold`| When `COLLECT_HTTP_PARAMS` is enabled, how many characters to keep and send to the OAP backend, use negative values to keep and send the complete parameters, NB. this config item is added for the sake of performance.  | `1024` |
+`correlation.element_max_number`|Max element count of the correlation context.|`3`|
+`correlation.value_max_length`|Max value length of correlation context element.|`128`|
 
 ## Optional Plugins
 Java agent plugins are all pluggable. Optional plugins could be provided in `optional-plugins` folder under agent or 3rd party repositories.
@@ -158,7 +160,7 @@ Now, we have the following known bootstrap plugins.
     * If you want to use OpenTracing Java APIs, try [SkyWalking OpenTracing compatible tracer](Opentracing.md). More details you could find at http://opentracing.io
     * If you want to print trace context(e.g. traceId) in your logs, choose the log frameworks, [log4j](Application-toolkit-log4j-1.x.md), 
 [log4j2](Application-toolkit-log4j-2.x.md), [logback](Application-toolkit-logback-1.x.md)
-    * If you want to use annotations or SkyWalking native APIs to read context, try [SkyWalking manual APIs](Application-toolkit-trace.md)
+    * If you want your codes to interact with SkyWalking agent, including `getting trace id`, `setting tags`, `propagating custom data` etc.. Try [SkyWalking manual APIs](Application-toolkit-trace.md).
     * If you want to continue traces across thread manually, use [across thread solution APIs](Application-toolkit-trace-cross-thread.md).
 * If you want to specify the path of your agent.config file. Read [set config file through system properties](Specified-agent-config.md)
 
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/config/expectedData.yaml b/test/plugin/scenarios/apm-toolkit-trace-scenario/config/expectedData.yaml
index 1f3d596..80800c7 100644
--- a/test/plugin/scenarios/apm-toolkit-trace-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/config/expectedData.yaml
@@ -194,6 +194,7 @@ segmentItems:
             tags:
               - {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/callable'}
               - {key: http.method, value: GET}
+              - {key: correlation, value: correlationValueTest}
             refs:
               - {parentEndpointId: -1, parentEndpoint: '',
                  networkAddressId: 0, entryEndpointId: 0, refType: CrossProcess, parentSpanId: 1,
@@ -289,6 +290,7 @@ segmentItems:
             tags:
               - {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/runnable'}
               - {key: http.method, value: GET}
+              - {key: correlation, value: correlationValueTest}
             refs:
               - {parentEndpointId: -1, parentEndpoint: '',
                  networkAddressId: 0, entryEndpointId: 0, refType: CrossProcess, parentSpanId: 1,
@@ -348,6 +350,7 @@ segmentItems:
             tags:
               - {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/supplier'}
               - {key: http.method, value: GET}
+              - {key: correlation, value: correlationValueTest}
             refs:
               - {parentEndpointId: -1, parentEndpoint: '',
                  networkAddressId: 0, entryEndpointId: 0, refType: CrossProcess, parentSpanId: 1,
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/TraceContext.java b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/TraceContext.java
index fb9a66e..c846f44 100644
--- a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/TraceContext.java
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/org/apache/skywalking/apm/toolkit/trace/TraceContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.skywalking.apm.toolkit.trace;
 
+import java.util.Optional;
+
 /**
  * Try to access the sky-walking tracer context. The context is not existed, always. only the middleware, component, or
  * rpc-framework are supported in the current invoke stack, in the same thread, the context will be available.
@@ -33,4 +35,23 @@ public class TraceContext {
     public static String traceId() {
         return "";
     }
+
+    /**
+     * Try to get the custom value from trace context.
+     *
+     * @return custom data value.
+     */
+    public static Optional<String> getCorrelation(String key) {
+        return Optional.empty();
+    }
+
+    /**
+     * Put the custom key/value into trace context.
+     *
+     * @return previous value if it exists.
+     */
+    public static Optional<String> putCorrelation(String key, String value) {
+        return Optional.empty();
+    }
+
 }
diff --git a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/toolkit/controller/TestController.java b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/toolkit/controller/TestController.java
index cf7941a..efd761c 100644
--- a/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/toolkit/controller/TestController.java
+++ b/test/plugin/scenarios/apm-toolkit-trace-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/toolkit/controller/TestController.java
@@ -25,6 +25,8 @@ import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
+import org.apache.skywalking.apm.toolkit.trace.ActiveSpan;
+import org.apache.skywalking.apm.toolkit.trace.TraceContext;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
@@ -35,6 +37,10 @@ public class TestController {
 
     private static final String SUCCESS = "Success";
 
+    private static final String CORRELATION_CONTEXT_KEY = "toolkit-test";
+    private static final String CORRELATION_CONTEXT_VALUE = "correlationValueTest";
+    private static final String CORRELATION_CONTEXT_TAG_KEY = "correlation";
+
     @Autowired
     private TestService testService;
 
@@ -48,6 +54,7 @@ public class TestController {
         testService.testErrorThrowable();
         testService.testTagAnnotation("testTagAnnotationParam1", "testTagAnnotationParam2");
         testService.testTagAnnotationReturnInfo("zhangsan", 15);
+        TraceContext.putCorrelation(CORRELATION_CONTEXT_KEY, CORRELATION_CONTEXT_VALUE);
         testService.asyncCallable(() -> {
             visit("http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/callable");
             return true;
@@ -77,16 +84,19 @@ public class TestController {
 
     @RequestMapping("/asyncVisit/runnable")
     public String asyncVisitRunnable() {
+        ActiveSpan.tag(CORRELATION_CONTEXT_TAG_KEY, TraceContext.getCorrelation(CORRELATION_CONTEXT_KEY).orElse(""));
         return SUCCESS;
     }
 
     @RequestMapping("/asyncVisit/callable")
     public String asyncVisitCallable() {
+        ActiveSpan.tag(CORRELATION_CONTEXT_TAG_KEY, TraceContext.getCorrelation(CORRELATION_CONTEXT_KEY).orElse(""));
         return SUCCESS;
     }
 
     @RequestMapping("/asyncVisit/supplier")
     public String asyncVisitSupplier() {
+        ActiveSpan.tag(CORRELATION_CONTEXT_TAG_KEY, TraceContext.getCorrelation(CORRELATION_CONTEXT_KEY).orElse(""));
         return SUCCESS;
     }