You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/11/19 09:08:26 UTC
[incubator-skywalking] branch v6-protocol updated: Make backend
supports new trace protocol
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch v6-protocol
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/v6-protocol by this push:
new 3d0486c Make backend supports new trace protocol
3d0486c is described below
commit 3d0486cb4b22d5a56a61eb77d57b8eeb1556531f
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Nov 19 17:08:16 2018 +0800
Make backend supports new trace protocol
---
.../apm/agent/core/context/ContextCarrier.java | 92 ++++++-------
.../apm/agent/core/context/TracingContext.java | 12 +-
.../agent/core/context/trace/TraceSegmentRef.java | 8 +-
.../core/context/ContextCarrierV2HeaderTest.java | 22 ++--
.../apm/agent/core/context/ContextManagerTest.java | 6 +-
.../core/context/IgnoredTracerContextTest.java | 2 +-
docs/en/protocols/Trace-Data-Protocol.md | 4 +-
.../analysis/manual/segment/SegmentDispatcher.java | 1 +
.../analysis/manual/segment/SegmentRecord.java | 4 +
.../skywalking/oap/server/core/source/Segment.java | 1 +
.../trace/provider/TraceModuleProvider.java | 18 ++-
.../v6/grpc/TraceSegmentReportServiceHandler.java | 66 ++++++++++
.../trace/provider/parser/SegmentParse.java | 38 ++++--
.../{SegmentParse.java => SegmentParseV2.java} | 70 ++++++----
.../provider/parser/SegmentParserServiceImpl.java | 6 +-
.../SegmentCoreInfo.java => SegmentSource.java} | 19 +--
.../parser/decorator/ReferenceDecorator.java | 133 +++++++++++++------
.../provider/parser/decorator/SegmentCoreInfo.java | 5 +-
.../parser/decorator/SegmentDecorator.java | 48 +++++--
.../provider/parser/decorator/SpanDecorator.java | 143 +++++++++++++++------
.../listener/endpoint/MultiScopesSpanListener.java | 18 +--
.../listener/segment/SegmentSpanListener.java | 6 +-
.../service/ServiceMappingSpanListener.java | 4 +-
.../standardization/ReferenceIdExchanger.java | 24 ++--
24 files changed, 499 insertions(+), 251 deletions(-)
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java
index 8ff218d..91d728e 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java
@@ -49,12 +49,12 @@ public class ContextCarrier implements Serializable {
/**
* id of parent application instance, it's the id assigned by collector.
*/
- private int parentApplicationInstanceId = DictionaryUtil.nullValue();
+ private int parentServiceInstanceId = DictionaryUtil.nullValue();
/**
* id of first application instance in this distributed trace, it's the id assigned by collector.
*/
- private int entryApplicationInstanceId = DictionaryUtil.nullValue();
+ private int entryServiceInstanceId = DictionaryUtil.nullValue();
/**
* peer(ipv4s/ipv6/hostname + port) of the server, from client side.
@@ -64,12 +64,12 @@ public class ContextCarrier implements Serializable {
/**
* Operation/Service name of the first one in this distributed trace. This name may be compressed to an integer.
*/
- private String entryOperationName;
+ private String entryEndpointName;
/**
* Operation/Service name of the parent one in this distributed trace. This name may be compressed to an integer.
*/
- private String parentOperationName;
+ private String parentEndpointName;
/**
* {@link DistributedTraceId}, also known as TraceId
@@ -106,11 +106,11 @@ public class ContextCarrier implements Serializable {
return StringUtil.join('|',
this.getTraceSegmentId().encode(),
this.getSpanId() + "",
- this.getParentApplicationInstanceId() + "",
- this.getEntryApplicationInstanceId() + "",
+ this.getParentServiceInstanceId() + "",
+ this.getEntryServiceInstanceId() + "",
this.getPeerHost(),
- this.getEntryOperationName(),
- this.getParentOperationName(),
+ this.getEntryEndpointName(),
+ this.getParentEndpointName(),
this.getPrimaryDistributedTraceId().encode());
} else {
return "";
@@ -122,11 +122,11 @@ public class ContextCarrier implements Serializable {
Base64.encode(this.getPrimaryDistributedTraceId().encode()),
Base64.encode(this.getTraceSegmentId().encode()),
this.getSpanId() + "",
- this.getParentApplicationInstanceId() + "",
- this.getEntryApplicationInstanceId() + "",
+ this.getParentServiceInstanceId() + "",
+ this.getEntryServiceInstanceId() + "",
Base64.encode(this.getPeerHost()),
- Base64.encode(this.getEntryOperationName()),
- Base64.encode(this.getParentOperationName()));
+ Base64.encode(this.getEntryEndpointName()),
+ Base64.encode(this.getParentEndpointName()));
} else {
return "";
}
@@ -153,11 +153,11 @@ public class ContextCarrier implements Serializable {
try {
this.traceSegmentId = new ID(parts[0]);
this.spanId = Integer.parseInt(parts[1]);
- this.parentApplicationInstanceId = Integer.parseInt(parts[2]);
- this.entryApplicationInstanceId = Integer.parseInt(parts[3]);
+ this.parentServiceInstanceId = Integer.parseInt(parts[2]);
+ this.entryServiceInstanceId = Integer.parseInt(parts[3]);
this.peerHost = parts[4];
- this.entryOperationName = parts[5];
- this.parentOperationName = parts[6];
+ this.entryEndpointName = parts[5];
+ this.parentEndpointName = parts[6];
this.primaryDistributedTraceId = new PropagatedTraceId(parts[7]);
} catch (NumberFormatException e) {
@@ -171,11 +171,11 @@ public class ContextCarrier implements Serializable {
this.primaryDistributedTraceId = new PropagatedTraceId(Base64.decode2UTFString(parts[1]));
this.traceSegmentId = new ID(Base64.decode2UTFString(parts[2]));
this.spanId = Integer.parseInt(parts[3]);
- this.parentApplicationInstanceId = Integer.parseInt(parts[4]);
- this.entryApplicationInstanceId = Integer.parseInt(parts[5]);
+ this.parentServiceInstanceId = Integer.parseInt(parts[4]);
+ this.entryServiceInstanceId = Integer.parseInt(parts[5]);
this.peerHost = Base64.decode2UTFString(parts[6]);
- this.entryOperationName = Base64.decode2UTFString(parts[7]);
- this.parentOperationName = Base64.decode2UTFString(parts[8]);
+ this.entryEndpointName = Base64.decode2UTFString(parts[7]);
+ this.parentEndpointName = Base64.decode2UTFString(parts[8]);
} catch (NumberFormatException e) {
}
@@ -201,18 +201,18 @@ public class ContextCarrier implements Serializable {
return traceSegmentId != null
&& traceSegmentId.isValid()
&& getSpanId() > -1
- && parentApplicationInstanceId != DictionaryUtil.nullValue()
- && entryApplicationInstanceId != DictionaryUtil.nullValue()
+ && parentServiceInstanceId != DictionaryUtil.nullValue()
+ && entryServiceInstanceId != DictionaryUtil.nullValue()
&& !StringUtil.isEmpty(peerHost)
- && !StringUtil.isEmpty(entryOperationName)
- && !StringUtil.isEmpty(parentOperationName)
+ && !StringUtil.isEmpty(entryEndpointName)
+ && !StringUtil.isEmpty(parentEndpointName)
&& primaryDistributedTraceId != null;
} else if (HeaderVersion.v2.equals(version)) {
return traceSegmentId != null
&& traceSegmentId.isValid()
&& getSpanId() > -1
- && parentApplicationInstanceId != DictionaryUtil.nullValue()
- && entryApplicationInstanceId != DictionaryUtil.nullValue()
+ && parentServiceInstanceId != DictionaryUtil.nullValue()
+ && entryServiceInstanceId != DictionaryUtil.nullValue()
&& !StringUtil.isEmpty(peerHost)
&& primaryDistributedTraceId != null;
} else {
@@ -220,24 +220,24 @@ public class ContextCarrier implements Serializable {
}
}
- public String getEntryOperationName() {
- return entryOperationName;
+ public String getEntryEndpointName() {
+ return entryEndpointName;
}
- void setEntryOperationName(String entryOperationName) {
- this.entryOperationName = '#' + entryOperationName;
+ void setEntryEndpointName(String entryEndpointName) {
+ this.entryEndpointName = '#' + entryEndpointName;
}
- void setEntryOperationId(int entryOperationId) {
- this.entryOperationName = entryOperationId + "";
+ void setEntryEndpointId(int entryOperationId) {
+ this.entryEndpointName = entryOperationId + "";
}
- void setParentOperationName(String parentOperationName) {
- this.parentOperationName = '#' + parentOperationName;
+ void setParentEndpointName(String parentEndpointName) {
+ this.parentEndpointName = '#' + parentEndpointName;
}
- void setParentOperationId(int parentOperationId) {
- this.parentOperationName = parentOperationId + "";
+ void setParentEndpointId(int parentOperationId) {
+ this.parentEndpointName = parentOperationId + "";
}
public ID getTraceSegmentId() {
@@ -256,12 +256,12 @@ public class ContextCarrier implements Serializable {
this.spanId = spanId;
}
- public int getParentApplicationInstanceId() {
- return parentApplicationInstanceId;
+ public int getParentServiceInstanceId() {
+ return parentServiceInstanceId;
}
- void setParentApplicationInstanceId(int parentApplicationInstanceId) {
- this.parentApplicationInstanceId = parentApplicationInstanceId;
+ void setParentServiceInstanceId(int parentServiceInstanceId) {
+ this.parentServiceInstanceId = parentServiceInstanceId;
}
public String getPeerHost() {
@@ -288,16 +288,16 @@ public class ContextCarrier implements Serializable {
return primaryDistributedTraceId;
}
- public String getParentOperationName() {
- return parentOperationName;
+ public String getParentEndpointName() {
+ return parentEndpointName;
}
- public int getEntryApplicationInstanceId() {
- return entryApplicationInstanceId;
+ public int getEntryServiceInstanceId() {
+ return entryServiceInstanceId;
}
- public void setEntryApplicationInstanceId(int entryApplicationInstanceId) {
- this.entryApplicationInstanceId = entryApplicationInstanceId;
+ public void setEntryServiceInstanceId(int entryServiceInstanceId) {
+ this.entryServiceInstanceId = entryServiceInstanceId;
}
public enum HeaderVersion {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
index 170c994..2d595d6 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
@@ -112,7 +112,7 @@ public class TracingContext implements AbstractTracerContext {
carrier.setTraceSegmentId(this.segment.getTraceSegmentId());
carrier.setSpanId(span.getSpanId());
- carrier.setParentApplicationInstanceId(segment.getApplicationInstanceId());
+ carrier.setParentServiceInstanceId(segment.getApplicationInstanceId());
if (DictionaryUtil.isNull(peerId)) {
carrier.setPeerHost(peer);
@@ -134,21 +134,21 @@ public class TracingContext implements AbstractTracerContext {
operationName = firstSpan.getOperationName();
entryApplicationInstanceId = this.segment.getApplicationInstanceId();
}
- carrier.setEntryApplicationInstanceId(entryApplicationInstanceId);
+ carrier.setEntryServiceInstanceId(entryApplicationInstanceId);
if (operationId == DictionaryUtil.nullValue()) {
if (!StringUtil.isEmpty(operationName)) {
- carrier.setEntryOperationName(operationName);
+ carrier.setEntryEndpointName(operationName);
}
} else {
- carrier.setEntryOperationId(operationId);
+ carrier.setEntryEndpointId(operationId);
}
int parentOperationId = first().getOperationId();
if (parentOperationId == DictionaryUtil.nullValue()) {
- carrier.setParentOperationName(first().getOperationName());
+ carrier.setParentEndpointName(first().getOperationName());
} else {
- carrier.setParentOperationId(parentOperationId);
+ carrier.setParentEndpointId(parentOperationId);
}
carrier.setDistributedTraceIds(this.segment.getRelatedGlobalTraces());
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegmentRef.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegmentRef.java
index 72db198..04ab37a 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegmentRef.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegmentRef.java
@@ -65,15 +65,15 @@ public class TraceSegmentRef {
this.type = SegmentRefType.CROSS_PROCESS;
this.traceSegmentId = carrier.getTraceSegmentId();
this.spanId = carrier.getSpanId();
- this.parentApplicationInstanceId = carrier.getParentApplicationInstanceId();
- this.entryApplicationInstanceId = carrier.getEntryApplicationInstanceId();
+ this.parentApplicationInstanceId = carrier.getParentServiceInstanceId();
+ this.entryApplicationInstanceId = carrier.getEntryServiceInstanceId();
String host = carrier.getPeerHost();
if (host.charAt(0) == '#') {
this.peerHost = host.substring(1);
} else {
this.peerId = Integer.parseInt(host);
}
- String entryOperationName = carrier.getEntryOperationName();
+ String entryOperationName = carrier.getEntryEndpointName();
if (!StringUtil.isEmpty(entryOperationName)) {
if (entryOperationName.charAt(0) == '#') {
this.entryOperationName = entryOperationName.substring(1);
@@ -81,7 +81,7 @@ public class TraceSegmentRef {
this.entryOperationId = Integer.parseInt(entryOperationName);
}
}
- String parentOperationName = carrier.getParentOperationName();
+ String parentOperationName = carrier.getParentEndpointName();
if (!StringUtil.isEmpty(parentOperationName)) {
if (parentOperationName.charAt(0) == '#') {
this.parentOperationName = parentOperationName.substring(1);
diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV2HeaderTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV2HeaderTest.java
index 00f4d9f..a2821d7 100644
--- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV2HeaderTest.java
+++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV2HeaderTest.java
@@ -98,11 +98,11 @@ public class ContextCarrierV2HeaderTest {
contextCarrier.setTraceSegmentId(new ID(1, 2, 3));
contextCarrier.setDistributedTraceIds(distributedTraceIds);
contextCarrier.setSpanId(4);
- contextCarrier.setEntryApplicationInstanceId(1);
- contextCarrier.setParentApplicationInstanceId(1);
+ contextCarrier.setEntryServiceInstanceId(1);
+ contextCarrier.setParentServiceInstanceId(1);
contextCarrier.setPeerHost("127.0.0.1:8080");
- contextCarrier.setEntryOperationName("/portal");
- contextCarrier.setParentOperationId(123);
+ contextCarrier.setEntryEndpointName("/portal");
+ contextCarrier.setParentEndpointId(123);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
@@ -152,11 +152,11 @@ public class ContextCarrierV2HeaderTest {
contextCarrier.setTraceSegmentId(new ID(1, 2, 3));
contextCarrier.setDistributedTraceIds(distributedTraceIds);
contextCarrier.setSpanId(4);
- contextCarrier.setEntryApplicationInstanceId(1);
- contextCarrier.setParentApplicationInstanceId(1);
+ contextCarrier.setEntryServiceInstanceId(1);
+ contextCarrier.setParentServiceInstanceId(1);
contextCarrier.setPeerHost("127.0.0.1:8080");
- contextCarrier.setEntryOperationName("/portal");
- contextCarrier.setParentOperationId(123);
+ contextCarrier.setEntryEndpointName("/portal");
+ contextCarrier.setParentEndpointId(123);
CarrierItem next = contextCarrier.items();
String headerValue = null;
@@ -188,8 +188,8 @@ public class ContextCarrierV2HeaderTest {
Assert.assertEquals(contextCarrier.getPeerHost(), contextCarrier2.getPeerHost());
Assert.assertEquals(contextCarrier.getDistributedTraceId(), contextCarrier2.getDistributedTraceId());
Assert.assertEquals(contextCarrier.getTraceSegmentId(), contextCarrier2.getTraceSegmentId());
- Assert.assertEquals(contextCarrier.getEntryOperationName(), contextCarrier2.getEntryOperationName());
- Assert.assertEquals(contextCarrier.getEntryApplicationInstanceId(), contextCarrier2.getEntryApplicationInstanceId());
- Assert.assertEquals(contextCarrier.getParentApplicationInstanceId(), contextCarrier2.getParentApplicationInstanceId());
+ Assert.assertEquals(contextCarrier.getEntryEndpointName(), contextCarrier2.getEntryEndpointName());
+ Assert.assertEquals(contextCarrier.getEntryServiceInstanceId(), contextCarrier2.getEntryServiceInstanceId());
+ Assert.assertEquals(contextCarrier.getParentServiceInstanceId(), contextCarrier2.getParentServiceInstanceId());
}
}
diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextManagerTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextManagerTest.java
index 7ff58d4..610d4eb 100644
--- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextManagerTest.java
+++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextManagerTest.java
@@ -160,7 +160,7 @@ public class ContextManagerTest {
assertThat(logs.get(0).getLogs().size(), is(4));
assertThat(injectContextCarrier.getSpanId(), is(1));
- assertThat(injectContextCarrier.getEntryOperationName(), is("#/portal/"));
+ assertThat(injectContextCarrier.getEntryEndpointName(), is("#/portal/"));
assertThat(injectContextCarrier.getPeerHost(), is("#127.0.0.1:12800"));
}
@@ -211,11 +211,11 @@ public class ContextManagerTest {
assertThat(firstExitSpanContextCarrier.getPeerHost(), is("#127.0.0.1:8080"));
assertThat(firstExitSpanContextCarrier.getSpanId(), is(1));
- assertThat(firstExitSpanContextCarrier.getEntryOperationName(), is("#/testEntrySpan"));
+ assertThat(firstExitSpanContextCarrier.getEntryEndpointName(), is("#/testEntrySpan"));
assertThat(secondExitSpanContextCarrier.getPeerHost(), is("#127.0.0.1:8080"));
assertThat(secondExitSpanContextCarrier.getSpanId(), is(1));
- assertThat(secondExitSpanContextCarrier.getEntryOperationName(), is("#/testEntrySpan"));
+ assertThat(secondExitSpanContextCarrier.getEntryEndpointName(), is("#/testEntrySpan"));
}
diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContextTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContextTest.java
index 67e4123..48fca08 100644
--- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContextTest.java
+++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContextTest.java
@@ -99,7 +99,7 @@ public class IgnoredTracerContextTest {
ContextManager.stopSpan();
assertThat(abstractSpan.getClass().getName(), is(NoopSpan.class.getName()));
- assertNull(contextCarrier.getEntryOperationName());
+ assertNull(contextCarrier.getEntryEndpointName());
assertThat(contextCarrier.getSpanId(), is(-1));
assertNull(contextCarrier.getPeerHost());
diff --git a/docs/en/protocols/Trace-Data-Protocol.md b/docs/en/protocols/Trace-Data-Protocol.md
index 7c2eaac..01208e0 100644
--- a/docs/en/protocols/Trace-Data-Protocol.md
+++ b/docs/en/protocols/Trace-Data-Protocol.md
@@ -56,13 +56,13 @@ Input:
"rs": [ //TraceSegmentReference
{
"pts": [230150, 185809, 24040000], //parentTraceSegmentId
- "pii": 2, //parentApplicationInstanceId
+ "pii": 2, //parentServiceInstanceId
"psp": 1, //parentSpanId
"psi": 0, //parentServiceId
"psn": "/dubbox-case/case/dubbox-rest", //parentServiceName
"ni": 0, //networkAddressId
"nn": "172.25.0.4:20880", //networkAddress
- "eii": 2, //entryApplicationInstanceId
+ "eii": 2, //entryServiceInstanceId
"esi": 0, //entryServiceId
"esn": "/dubbox-case/case/dubbox-rest", //entryServiceName
"rv": 0 //RefTypeValue
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java
index 322abf9..c7b11da 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java
@@ -40,6 +40,7 @@ public class SegmentDispatcher implements SourceDispatcher<Segment> {
segment.setIsError(source.getIsError());
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
+ segment.setVersion(source.getVersion());
RecordProcess.INSTANCE.in(segment);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
index c974cff..3cf80c2 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
@@ -47,6 +47,7 @@ public class SegmentRecord extends Record {
public static final String LATENCY = "latency";
public static final String IS_ERROR = "is_error";
public static final String DATA_BINARY = "data_binary";
+ public static final String VERSION = "version";
@Setter @Getter @Column(columnName = SEGMENT_ID) @IDColumn private String segmentId;
@Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId;
@@ -58,6 +59,7 @@ public class SegmentRecord extends Record {
@Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency;
@Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError;
@Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary;
+ @Setter @Getter @Column(columnName = VERSION) @IDColumn private int version;
@Override public String id() {
return segmentId;
@@ -82,6 +84,7 @@ public class SegmentRecord extends Record {
} else {
map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
+ map.put(VERSION, storageData.getVersion());
return map;
}
@@ -102,6 +105,7 @@ public class SegmentRecord extends Record {
} else {
record.setDataBinary(Base64.getDecoder().decode((String)dbMap.get(DATA_BINARY)));
}
+ record.setVersion(((Number)dbMap.get(VERSION)).intValue());
return record;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java
index 110f564..041f7df 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java
@@ -45,4 +45,5 @@ public class Segment extends Source {
@Setter @Getter private int latency;
@Setter @Getter private int isError;
@Setter @Getter private byte[] dataBinary;
+ @Setter @Getter private int version;
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index 462fbf2..10147a9 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.grpc.TraceSegmentServiceHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.rest.TraceSegmentServletHandler;
+import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.grpc.TraceSegmentReportServiceHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener;
@@ -38,6 +39,7 @@ public class TraceModuleProvider extends ModuleProvider {
private final TraceServiceModuleConfig moduleConfig;
private SegmentParse.Producer segmentProducer;
+ private SegmentParseV2.Producer segmentProducerV2;
public TraceModuleProvider() {
this.moduleConfig = new TraceServiceModuleConfig();
@@ -62,7 +64,15 @@ public class TraceModuleProvider extends ModuleProvider {
listenerManager.add(new SegmentSpanListener.Factory());
segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
- this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducer));
+
+ listenerManager = new SegmentParserListenerManager();
+ listenerManager.add(new MultiScopesSpanListener.Factory());
+ listenerManager.add(new ServiceMappingSpanListener.Factory());
+ listenerManager.add(new SegmentSpanListener.Factory());
+
+ segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager);
+
+ this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2));
}
@Override public void start() throws ModuleStartException {
@@ -71,10 +81,14 @@ public class TraceModuleProvider extends ModuleProvider {
try {
grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentProducer));
+ grpcHandlerRegister.addHandler(new TraceSegmentReportServiceHandler(segmentProducerV2));
jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer));
- SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentProducer, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
+ SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentProducer, moduleConfig.getBufferPath() + "-v5", moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
segmentProducer.setStandardizationWorker(standardizationWorker);
+
+ SegmentStandardizationWorker standardizationWorker2 = new SegmentStandardizationWorker(segmentProducer, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
+ segmentProducerV2.setStandardizationWorker(standardizationWorker2);
} catch (IOException e) {
throw new ModuleStartException(e.getMessage(), e);
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/grpc/TraceSegmentReportServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/grpc/TraceSegmentReportServiceHandler.java
new file mode 100644
index 0000000..11a5d66
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/grpc/TraceSegmentReportServiceHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.grpc;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.network.common.Commands;
+import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
+import org.apache.skywalking.apm.network.language.agent.v2.TraceSegmentReportServiceGrpc;
+import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
+import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.grpc.TraceSegmentServiceHandler;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TraceSegmentReportServiceHandler extends TraceSegmentReportServiceGrpc.TraceSegmentReportServiceImplBase implements GRPCHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
+
+ private final Boolean debug;
+ private final SegmentParseV2.Producer segmentProducer;
+
+ public TraceSegmentReportServiceHandler(SegmentParseV2.Producer segmentProducer) {
+ this.debug = System.getProperty("debug") != null;
+ this.segmentProducer = segmentProducer;
+ }
+
+ @Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Commands> responseObserver) {
+ return new StreamObserver<UpstreamSegment>() {
+ @Override public void onNext(UpstreamSegment segment) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("receive segment");
+ }
+
+ segmentProducer.send(segment, SegmentSource.Agent);
+ }
+
+ @Override public void onError(Throwable throwable) {
+ logger.error(throwable.getMessage(), throwable);
+ responseObserver.onCompleted();
+ }
+
+ @Override public void onCompleted() {
+ responseObserver.onNext(Commands.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+ };
+ }
+}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index 4f3e48a..d4bd861 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -19,16 +19,32 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
import com.google.protobuf.InvalidProtocolBufferException;
-import java.util.*;
+import java.util.LinkedList;
+import java.util.List;
import lombok.Setter;
-import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.network.language.agent.SpanType;
+import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.oap.server.library.buffer.DataStreamReader;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentDecorator;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.ExitSpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.FirstSpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.GlobalTraceIdsListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.LocalSpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.ReferenceIdExchanger;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardization;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardizationWorker;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SpanIdExchanger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
@@ -50,6 +66,7 @@ public class SegmentParse {
this.segmentCoreInfo = new SegmentCoreInfo();
this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
+ this.segmentCoreInfo.setV2(false);
}
public boolean parse(UpstreamSegment segment, Source source) {
@@ -103,19 +120,20 @@ public class SegmentParse {
}
segmentCoreInfo.setSegmentId(segmentIdBuilder.toString());
- segmentCoreInfo.setApplicationId(segmentDecorator.getApplicationId());
- segmentCoreInfo.setApplicationInstanceId(segmentDecorator.getApplicationInstanceId());
+ segmentCoreInfo.setServiceId(segmentDecorator.getServiceId());
+ segmentCoreInfo.setServiceInstanceId(segmentDecorator.getServiceInstanceId());
segmentCoreInfo.setDataBinary(segmentDecorator.toByteArray());
+ segmentCoreInfo.setV2(false);
for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
- if (!SpanIdExchanger.getInstance(moduleManager).exchange(spanDecorator, segmentCoreInfo.getApplicationId())) {
+ if (!SpanIdExchanger.getInstance(moduleManager).exchange(spanDecorator, segmentCoreInfo.getServiceId())) {
return false;
} else {
for (int j = 0; j < spanDecorator.getRefsCount(); j++) {
ReferenceDecorator referenceDecorator = spanDecorator.getRefs(j);
- if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, segmentCoreInfo.getApplicationId())) {
+ if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, segmentCoreInfo.getServiceId())) {
return false;
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
similarity index 74%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
index 4f3e48a..2058f61 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java
@@ -19,23 +19,41 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
import com.google.protobuf.InvalidProtocolBufferException;
-import java.util.*;
+import java.util.LinkedList;
+import java.util.List;
import lombok.Setter;
-import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.network.language.agent.SpanType;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
import org.apache.skywalking.oap.server.library.buffer.DataStreamReader;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
-import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
-import org.slf4j.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentDecorator;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.ExitSpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.FirstSpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.GlobalTraceIdsListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.LocalSpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.ReferenceIdExchanger;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardization;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardizationWorker;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SpanIdExchanger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * @author peng-yongsheng
+ * SegmentParseV2 is a replication of SegmentParse, but be compatible with v2 trace protocol.
+ *
+ * @author wusheng
*/
-public class SegmentParse {
+public class SegmentParseV2 {
- private static final Logger logger = LoggerFactory.getLogger(SegmentParse.class);
+ private static final Logger logger = LoggerFactory.getLogger(SegmentParseV2.class);
private final ModuleManager moduleManager;
private final List<SpanListener> spanListeners;
@@ -43,21 +61,22 @@ public class SegmentParse {
private final SegmentCoreInfo segmentCoreInfo;
@Setter private SegmentStandardizationWorker standardizationWorker;
- private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+ private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.spanListeners = new LinkedList<>();
this.segmentCoreInfo = new SegmentCoreInfo();
this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
+ this.segmentCoreInfo.setV2(true);
}
- public boolean parse(UpstreamSegment segment, Source source) {
+ public boolean parse(UpstreamSegment segment, SegmentSource source) {
createSpanListeners();
try {
List<UniqueId> traceIds = segment.getGlobalTraceIdsList();
- TraceSegmentObject segmentObject = parseBinarySegment(segment);
+ SegmentObject segmentObject = parseBinarySegment(segment);
SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject);
@@ -66,7 +85,7 @@ public class SegmentParse {
logger.debug("This segment id exchange not success, write to buffer file, id: {}", segmentCoreInfo.getSegmentId());
}
- if (source.equals(Source.Agent)) {
+ if (source.equals(SegmentSource.Agent)) {
writeToBufferFile(segmentCoreInfo.getSegmentId(), segment);
}
return false;
@@ -83,8 +102,8 @@ public class SegmentParse {
}
}
- private TraceSegmentObject parseBinarySegment(UpstreamSegment segment) throws InvalidProtocolBufferException {
- return TraceSegmentObject.parseFrom(segment.getSegment());
+ private SegmentObject parseBinarySegment(UpstreamSegment segment) throws InvalidProtocolBufferException {
+ return SegmentObject.parseFrom(segment.getSegment());
}
private boolean preBuild(List<UniqueId> traceIds, SegmentDecorator segmentDecorator) {
@@ -103,19 +122,20 @@ public class SegmentParse {
}
segmentCoreInfo.setSegmentId(segmentIdBuilder.toString());
- segmentCoreInfo.setApplicationId(segmentDecorator.getApplicationId());
- segmentCoreInfo.setApplicationInstanceId(segmentDecorator.getApplicationInstanceId());
+ segmentCoreInfo.setServiceId(segmentDecorator.getServiceId());
+ segmentCoreInfo.setServiceInstanceId(segmentDecorator.getServiceInstanceId());
segmentCoreInfo.setDataBinary(segmentDecorator.toByteArray());
+ segmentCoreInfo.setV2(true);
for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
- if (!SpanIdExchanger.getInstance(moduleManager).exchange(spanDecorator, segmentCoreInfo.getApplicationId())) {
+ if (!SpanIdExchanger.getInstance(moduleManager).exchange(spanDecorator, segmentCoreInfo.getServiceId())) {
return false;
} else {
for (int j = 0; j < spanDecorator.getRefsCount(); j++) {
ReferenceDecorator referenceDecorator = spanDecorator.getRefs(j);
- if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, segmentCoreInfo.getApplicationId())) {
+ if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, segmentCoreInfo.getServiceId())) {
return false;
}
}
@@ -213,10 +233,6 @@ public class SegmentParse {
listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
}
- public enum Source {
- Agent, Buffer
- }
-
public static class Producer implements DataStreamReader.CallBack<UpstreamSegment> {
@Setter private SegmentStandardizationWorker standardizationWorker;
@@ -228,16 +244,16 @@ public class SegmentParse {
this.listenerManager = listenerManager;
}
- public void send(UpstreamSegment segment, Source source) {
- SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+ public void send(UpstreamSegment segment, SegmentSource source) {
+ SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager);
segmentParse.setStandardizationWorker(standardizationWorker);
segmentParse.parse(segment, source);
}
@Override public boolean call(UpstreamSegment segment) {
- SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+ SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager);
segmentParse.setStandardizationWorker(standardizationWorker);
- return segmentParse.parse(segment, Source.Buffer);
+ return segmentParse.parse(segment, SegmentSource.Buffer);
}
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java
index 2d0ab6d..5392e62 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java
@@ -24,15 +24,15 @@ import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
* @author wusheng
*/
public class SegmentParserServiceImpl implements ISegmentParserService {
- private final SegmentParse.Producer segmentProducer;
+ private final SegmentParseV2.Producer segmentProducer;
public SegmentParserServiceImpl(
- SegmentParse.Producer segmentProducer) {
+ SegmentParseV2.Producer segmentProducer) {
this.segmentProducer = segmentProducer;
}
@Override
public void send(UpstreamSegment segment) {
- segmentProducer.send(segment, SegmentParse.Source.Agent);
+ segmentProducer.send(segment,SegmentSource.Agent);
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentSource.java
similarity index 71%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java
copy to oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentSource.java
index a0c0b7f..7bbf3f9 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentSource.java
@@ -16,22 +16,11 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator;
-
-import lombok.*;
+package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
/**
- * @author peng-yongsheng
+ * @author wusheng
*/
-@Getter
-@Setter
-public class SegmentCoreInfo {
- private String segmentId;
- private int applicationId;
- private int applicationInstanceId;
- private long startTime;
- private long endTime;
- private boolean isError;
- private long minuteTimeBucket;
- private byte[] dataBinary;
+public enum SegmentSource {
+ Agent, Buffer
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/ReferenceDecorator.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/ReferenceDecorator.java
index 964d6e8..e7f4dd8 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/ReferenceDecorator.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/ReferenceDecorator.java
@@ -18,7 +18,10 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator;
-import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.network.language.agent.RefType;
+import org.apache.skywalking.apm.network.language.agent.TraceSegmentReference;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentReference;
/**
* @author peng-yongsheng
@@ -29,131 +32,165 @@ public class ReferenceDecorator implements StandardBuilder {
private StandardBuilder standardBuilder;
private TraceSegmentReference referenceObject;
private TraceSegmentReference.Builder referenceBuilder;
+ private final boolean isV2;
+ private SegmentReference referenceObjectV2;
+ private SegmentReference.Builder referenceBuilderV2;
public ReferenceDecorator(TraceSegmentReference referenceObject, StandardBuilder standardBuilder) {
this.referenceObject = referenceObject;
this.standardBuilder = standardBuilder;
+ isV2 = false;
}
public ReferenceDecorator(TraceSegmentReference.Builder referenceBuilder, StandardBuilder standardBuilder) {
this.referenceBuilder = referenceBuilder;
this.standardBuilder = standardBuilder;
this.isOrigin = false;
+ isV2 = false;
+ }
+
+ public ReferenceDecorator(SegmentReference referenceObject, StandardBuilder standardBuilder) {
+ this.referenceObjectV2 = referenceObject;
+ this.standardBuilder = standardBuilder;
+ isV2 = true;
+ }
+
+ public ReferenceDecorator(SegmentReference.Builder referenceBuilder, StandardBuilder standardBuilder) {
+ this.referenceBuilderV2 = referenceBuilder;
+ this.standardBuilder = standardBuilder;
+ this.isOrigin = false;
+ isV2 = true;
}
public RefType getRefType() {
if (isOrigin) {
- return referenceObject.getRefType();
+ return isV2 ? referenceObjectV2.getRefType() : referenceObject.getRefType();
} else {
- return referenceBuilder.getRefType();
+ return isV2 ? referenceBuilderV2.getRefType() : referenceBuilder.getRefType();
}
}
public int getRefTypeValue() {
if (isOrigin) {
- return referenceObject.getRefTypeValue();
+ return isV2 ? referenceObjectV2.getRefTypeValue() : referenceObject.getRefTypeValue();
} else {
- return referenceBuilder.getRefTypeValue();
+ return isV2 ? referenceBuilderV2.getRefTypeValue() : referenceBuilder.getRefTypeValue();
}
}
- public int getEntryServiceId() {
+ public int getEntryEndpointId() {
if (isOrigin) {
- return referenceObject.getEntryServiceId();
+ return isV2 ? referenceObjectV2.getEntryEndpointId() : referenceObject.getEntryServiceId();
} else {
- return referenceBuilder.getEntryServiceId();
+ return isV2 ? referenceBuilderV2.getEntryEndpointId() : referenceBuilder.getEntryServiceId();
}
}
- public void setEntryServiceId(int value) {
+ public void setEntryEndpointId(int value) {
if (isOrigin) {
toBuilder();
}
- referenceBuilder.setEntryServiceId(value);
+ if (isV2) {
+ referenceBuilderV2.setEntryEndpointId(value);
+ } else {
+ referenceBuilder.setEntryServiceId(value);
+ }
}
- public String getEntryServiceName() {
+ public String getEntryEndpointName() {
if (isOrigin) {
- return referenceObject.getEntryServiceName();
+ return isV2 ? referenceObjectV2.getEntryEndpoint() : referenceObject.getEntryServiceName();
} else {
- return referenceBuilder.getEntryServiceName();
+ return isV2 ? referenceBuilderV2.getEntryEndpoint() : referenceBuilder.getEntryServiceName();
}
}
- public void setEntryServiceName(String value) {
+ public void setEntryEndpointName(String value) {
if (isOrigin) {
toBuilder();
}
- referenceBuilder.setEntryServiceName(value);
+ if (isV2) {
+ referenceBuilderV2.setEntryEndpoint(value);
+ } else {
+ referenceBuilder.setEntryServiceName(value);
+ }
}
- public int getEntryApplicationInstanceId() {
+ public int getEntryServiceInstanceId() {
if (isOrigin) {
- return referenceObject.getEntryApplicationInstanceId();
+ return isV2 ? referenceObjectV2.getEntryServiceInstanceId() : referenceObject.getEntryApplicationInstanceId();
} else {
- return referenceBuilder.getEntryApplicationInstanceId();
+ return isV2 ? referenceBuilderV2.getEntryServiceInstanceId() : referenceBuilder.getEntryApplicationInstanceId();
}
}
- public int getParentApplicationInstanceId() {
+ public int getParentServiceInstanceId() {
if (isOrigin) {
- return referenceObject.getParentApplicationInstanceId();
+ return isV2 ? referenceObjectV2.getParentServiceInstanceId() : referenceObject.getParentApplicationInstanceId();
} else {
- return referenceBuilder.getParentApplicationInstanceId();
+ return isV2 ? referenceBuilderV2.getParentServiceInstanceId() : referenceBuilder.getParentApplicationInstanceId();
}
}
- public int getParentServiceId() {
+ public int getParentEndpointId() {
if (isOrigin) {
- return referenceObject.getParentServiceId();
+ return isV2 ? referenceObjectV2.getParentEndpointId() : referenceObject.getParentServiceId();
} else {
- return referenceBuilder.getParentServiceId();
+ return isV2 ? referenceBuilderV2.getParentEndpointId() : referenceBuilder.getParentServiceId();
}
}
- public void setParentServiceId(int value) {
+ public void setParentEndpointId(int value) {
if (isOrigin) {
toBuilder();
}
- referenceBuilder.setParentServiceId(value);
+ if (isV2) {
+ referenceBuilderV2.setParentServiceInstanceId(value);
+ } else {
+ referenceBuilder.setParentServiceId(value);
+ }
}
public int getParentSpanId() {
if (isOrigin) {
- return referenceObject.getParentSpanId();
+ return isV2 ? referenceObjectV2.getParentSpanId() : referenceObject.getParentSpanId();
} else {
- return referenceBuilder.getParentSpanId();
+ return isV2 ? referenceBuilderV2.getParentSpanId() : referenceBuilder.getParentSpanId();
}
}
- public String getParentServiceName() {
+ public String getParentEndpointName() {
if (isOrigin) {
- return referenceObject.getParentServiceName();
+ return isV2 ? referenceObjectV2.getParentEndpoint() : referenceObject.getParentServiceName();
} else {
- return referenceBuilder.getParentServiceName();
+ return isV2 ? referenceBuilderV2.getParentEndpoint() : referenceBuilder.getParentServiceName();
}
}
- public void setParentServiceName(String value) {
+ public void setParentEndpointName(String value) {
if (isOrigin) {
toBuilder();
}
- referenceBuilder.setParentServiceName(value);
+ if (isV2) {
+ referenceBuilderV2.setParentEndpoint(value);
+ } else {
+ referenceBuilder.setParentServiceName(value);
+ }
}
public UniqueId getParentTraceSegmentId() {
if (isOrigin) {
- return referenceObject.getParentTraceSegmentId();
+ return isV2 ? referenceObjectV2.getParentTraceSegmentId() : referenceObject.getParentTraceSegmentId();
} else {
- return referenceBuilder.getParentTraceSegmentId();
+ return isV2 ? referenceBuilderV2.getParentTraceSegmentId() : referenceBuilder.getParentTraceSegmentId();
}
}
public int getNetworkAddressId() {
if (isOrigin) {
- return referenceObject.getNetworkAddressId();
+ return isV2 ? referenceObjectV2.getNetworkAddressId() : referenceObject.getNetworkAddressId();
} else {
- return referenceBuilder.getNetworkAddressId();
+ return isV2 ? referenceBuilderV2.getNetworkAddressId() : referenceBuilder.getNetworkAddressId();
}
}
@@ -161,14 +198,18 @@ public class ReferenceDecorator implements StandardBuilder {
if (isOrigin) {
toBuilder();
}
- referenceBuilder.setNetworkAddressId(value);
+ if (isV2) {
+ referenceBuilderV2.setNetworkAddressId(value);
+ } else {
+ referenceBuilder.setNetworkAddressId(value);
+ }
}
public String getNetworkAddress() {
if (isOrigin) {
- return referenceObject.getNetworkAddress();
+ return isV2 ? referenceObjectV2.getNetworkAddress() : referenceObject.getNetworkAddress();
} else {
- return referenceBuilder.getNetworkAddress();
+ return isV2 ? referenceBuilderV2.getNetworkAddress() : referenceBuilder.getNetworkAddress();
}
}
@@ -176,13 +217,21 @@ public class ReferenceDecorator implements StandardBuilder {
if (isOrigin) {
toBuilder();
}
- referenceBuilder.setNetworkAddress(value);
+ if (isV2) {
+ referenceBuilderV2.setNetworkAddress(value);
+ } else {
+ referenceBuilder.setNetworkAddress(value);
+ }
}
@Override public void toBuilder() {
if (this.isOrigin) {
this.isOrigin = false;
- referenceBuilder = referenceObject.toBuilder();
+ if (isV2) {
+ referenceBuilderV2 = referenceObjectV2.toBuilder();
+ } else {
+ referenceBuilder = referenceObject.toBuilder();
+ }
standardBuilder.toBuilder();
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java
index a0c0b7f..d78bea8 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java
@@ -27,11 +27,12 @@ import lombok.*;
@Setter
public class SegmentCoreInfo {
private String segmentId;
- private int applicationId;
- private int applicationInstanceId;
+ private int serviceId;
+ private int serviceInstanceId;
private long startTime;
private long endTime;
private boolean isError;
private long minuteTimeBucket;
private byte[] dataBinary;
+ private boolean isV2;
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentDecorator.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentDecorator.java
index 29232bd..f1d2451 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentDecorator.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentDecorator.java
@@ -18,7 +18,9 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator;
-import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
import static java.util.Objects.isNull;
@@ -29,35 +31,55 @@ public class SegmentDecorator implements StandardBuilder {
private boolean isOrigin = true;
private final TraceSegmentObject segmentObject;
private TraceSegmentObject.Builder segmentBuilder;
+ private final boolean isV2;
+ private final SegmentObject segmentObjectV2;
+ private SegmentObject.Builder segmentBuilderV2;
private final SpanDecorator[] spanDecorators;
public SegmentDecorator(TraceSegmentObject segmentObject) {
this.segmentObject = segmentObject;
+ this.segmentObjectV2 = null;
this.spanDecorators = new SpanDecorator[segmentObject.getSpansCount()];
+ isV2 = false;
}
- public int getApplicationId() {
- return segmentObject.getApplicationId();
+ public SegmentDecorator(SegmentObject segmentObjectV2) {
+ this.segmentObjectV2 = segmentObjectV2;
+ this.segmentObject = null;
+ this.spanDecorators = new SpanDecorator[segmentObject.getSpansCount()];
+ isV2 = true;
+ }
+
+ public int getServiceId() {
+ return isV2 ? segmentObjectV2.getServiceId() : segmentObject.getApplicationId();
}
- public int getApplicationInstanceId() {
- return segmentObject.getApplicationInstanceId();
+ public int getServiceInstanceId() {
+ return isV2 ? segmentObjectV2.getServiceInstanceId() : segmentObject.getApplicationInstanceId();
}
public UniqueId getTraceSegmentId() {
- return segmentObject.getTraceSegmentId();
+ return isV2 ? segmentObjectV2.getTraceSegmentId() : segmentObject.getTraceSegmentId();
}
public int getSpansCount() {
- return segmentObject.getSpansCount();
+ return isV2 ? segmentObjectV2.getSpansCount() : segmentObject.getSpansCount();
}
public SpanDecorator getSpans(int index) {
if (isNull(spanDecorators[index])) {
if (isOrigin) {
- spanDecorators[index] = new SpanDecorator(segmentObject.getSpans(index), this);
+ if (isV2) {
+ spanDecorators[index] = new SpanDecorator(segmentObjectV2.getSpans(index), this);
+ } else {
+ spanDecorators[index] = new SpanDecorator(segmentObject.getSpans(index), this);
+ }
} else {
- spanDecorators[index] = new SpanDecorator(segmentBuilder.getSpansBuilder(index), this);
+ if (isV2) {
+ spanDecorators[index] = new SpanDecorator(segmentBuilderV2.getSpansBuilder(index), this);
+ } else {
+ spanDecorators[index] = new SpanDecorator(segmentBuilder.getSpansBuilder(index), this);
+ }
}
}
return spanDecorators[index];
@@ -65,7 +87,7 @@ public class SegmentDecorator implements StandardBuilder {
public byte[] toByteArray() {
if (isOrigin) {
- return segmentObject.toByteArray();
+ return isV2 ? segmentObjectV2.toByteArray() : segmentObject.toByteArray();
} else {
return segmentBuilder.build().toByteArray();
}
@@ -74,7 +96,11 @@ public class SegmentDecorator implements StandardBuilder {
@Override public void toBuilder() {
if (isOrigin) {
this.isOrigin = false;
- this.segmentBuilder = segmentObject.toBuilder();
+ if (isV2) {
+ this.segmentBuilderV2 = segmentObjectV2.toBuilder();
+ } else {
+ this.segmentBuilder = segmentObject.toBuilder();
+ }
}
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SpanDecorator.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SpanDecorator.java
index bb00495..fb4cf36 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SpanDecorator.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SpanDecorator.java
@@ -18,7 +18,10 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator;
-import org.apache.skywalking.apm.network.language.agent.*;
+import org.apache.skywalking.apm.network.language.agent.SpanLayer;
+import org.apache.skywalking.apm.network.language.agent.SpanObject;
+import org.apache.skywalking.apm.network.language.agent.SpanType;
+import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
import static java.util.Objects.isNull;
@@ -26,16 +29,20 @@ import static java.util.Objects.isNull;
* @author peng-yongsheng
*/
public class SpanDecorator implements StandardBuilder {
+ private final boolean isV2;
private boolean isOrigin = true;
private StandardBuilder standardBuilder;
private SpanObject spanObject;
+ private SpanObjectV2 spanObjectV2;
private SpanObject.Builder spanBuilder;
+ private SpanObjectV2.Builder spanBuilderV2;
private final ReferenceDecorator[] referenceDecorators;
public SpanDecorator(SpanObject spanObject, StandardBuilder standardBuilder) {
this.spanObject = spanObject;
this.standardBuilder = standardBuilder;
this.referenceDecorators = new ReferenceDecorator[spanObject.getRefsCount()];
+ this.isV2 = false;
}
public SpanDecorator(SpanObject.Builder spanBuilder, StandardBuilder standardBuilder) {
@@ -43,77 +50,93 @@ public class SpanDecorator implements StandardBuilder {
this.standardBuilder = standardBuilder;
this.isOrigin = false;
this.referenceDecorators = new ReferenceDecorator[spanBuilder.getRefsCount()];
+ this.isV2 = false;
+ }
+
+ public SpanDecorator(SpanObjectV2 spanObject, StandardBuilder standardBuilder) {
+ this.spanObjectV2 = spanObject;
+ this.standardBuilder = standardBuilder;
+ this.referenceDecorators = new ReferenceDecorator[spanObject.getRefsCount()];
+ this.isV2 = true;
+ }
+
+ public SpanDecorator(SpanObjectV2.Builder spanBuilder, StandardBuilder standardBuilder) {
+ this.spanBuilderV2 = spanBuilder;
+ this.standardBuilder = standardBuilder;
+ this.isOrigin = false;
+ this.referenceDecorators = new ReferenceDecorator[spanBuilder.getRefsCount()];
+ this.isV2 = true;
}
public int getSpanId() {
if (isOrigin) {
- return spanObject.getSpanId();
+ return isV2 ? spanObjectV2.getSpanId() : spanObject.getSpanId();
} else {
- return spanBuilder.getSpanId();
+ return isV2 ? spanBuilderV2.getSpanId() : spanBuilder.getSpanId();
}
}
public int getParentSpanId() {
if (isOrigin) {
- return spanObject.getParentSpanId();
+ return isV2 ? spanObjectV2.getParentSpanId() : spanObject.getParentSpanId();
} else {
- return spanBuilder.getParentSpanId();
+ return isV2 ? spanBuilderV2.getParentSpanId() : spanBuilder.getParentSpanId();
}
}
public SpanType getSpanType() {
if (isOrigin) {
- return spanObject.getSpanType();
+ return isV2 ? spanObjectV2.getSpanType() : spanObject.getSpanType();
} else {
- return spanBuilder.getSpanType();
+ return isV2 ? spanBuilderV2.getSpanType() : spanBuilder.getSpanType();
}
}
public int getSpanTypeValue() {
if (isOrigin) {
- return spanObject.getSpanTypeValue();
+ return isV2 ? spanObjectV2.getSpanTypeValue() : spanObject.getSpanTypeValue();
} else {
- return spanBuilder.getSpanTypeValue();
+ return isV2 ? spanBuilderV2.getSpanTypeValue() : spanBuilder.getSpanTypeValue();
}
}
public SpanLayer getSpanLayer() {
if (isOrigin) {
- return spanObject.getSpanLayer();
+ return isV2 ? spanObjectV2.getSpanLayer() : spanObject.getSpanLayer();
} else {
- return spanBuilder.getSpanLayer();
+ return isV2 ? spanBuilderV2.getSpanLayer() : spanBuilder.getSpanLayer();
}
}
public int getSpanLayerValue() {
if (isOrigin) {
- return spanObject.getSpanLayerValue();
+ return isV2 ? spanObjectV2.getSpanLayerValue() : spanObject.getSpanLayerValue();
} else {
- return spanBuilder.getSpanLayerValue();
+ return isV2 ? spanBuilderV2.getSpanLayerValue() : spanBuilder.getSpanLayerValue();
}
}
public long getStartTime() {
if (isOrigin) {
- return spanObject.getStartTime();
+ return isV2 ? spanObjectV2.getStartTime() : spanObject.getStartTime();
} else {
- return spanBuilder.getStartTime();
+ return isV2 ? spanBuilderV2.getStartTime() : spanBuilder.getStartTime();
}
}
public long getEndTime() {
if (isOrigin) {
- return spanObject.getEndTime();
+ return isV2 ? spanObjectV2.getEndTime() : spanObject.getEndTime();
} else {
- return spanBuilder.getEndTime();
+ return isV2 ? spanBuilderV2.getEndTime() : spanBuilder.getEndTime();
}
}
public int getComponentId() {
if (isOrigin) {
- return spanObject.getComponentId();
+ return isV2 ? spanObjectV2.getComponentId() : spanObject.getComponentId();
} else {
- return spanBuilder.getComponentId();
+ return isV2 ? spanBuilderV2.getComponentId() : spanBuilder.getComponentId();
}
}
@@ -121,14 +144,18 @@ public class SpanDecorator implements StandardBuilder {
if (isOrigin) {
toBuilder();
}
- spanBuilder.setComponentId(value);
+ if (isV2) {
+ spanBuilderV2.setComponentId(value);
+ } else {
+ spanBuilder.setComponentId(value);
+ }
}
public String getComponent() {
if (isOrigin) {
- return spanObject.getComponent();
+ return isV2 ? spanObjectV2.getComponent() : spanObject.getComponent();
} else {
- return spanBuilder.getComponent();
+ return isV2 ? spanBuilderV2.getComponent() : spanBuilder.getComponent();
}
}
@@ -136,14 +163,18 @@ public class SpanDecorator implements StandardBuilder {
if (isOrigin) {
toBuilder();
}
- spanBuilder.setComponent(value);
+ if (isV2) {
+ spanBuilderV2.setComponent(value);
+ } else {
+ spanBuilder.setComponent(value);
+ }
}
public int getPeerId() {
if (isOrigin) {
- return spanObject.getPeerId();
+ return isV2 ? spanObjectV2.getPeerId() : spanObject.getPeerId();
} else {
- return spanBuilder.getPeerId();
+ return isV2 ? spanBuilderV2.getPeerId() : spanBuilder.getPeerId();
}
}
@@ -151,14 +182,18 @@ public class SpanDecorator implements StandardBuilder {
if (isOrigin) {
toBuilder();
}
- spanBuilder.setPeerId(peerId);
+ if (isV2) {
+ spanBuilderV2.setPeerId(peerId);
+ } else {
+ spanBuilder.setPeerId(peerId);
+ }
}
public String getPeer() {
if (isOrigin) {
- return spanObject.getPeer();
+ return isV2 ? spanObjectV2.getPeer() : spanObject.getPeer();
} else {
- return spanBuilder.getPeer();
+ return isV2 ? spanBuilderV2.getPeer() : spanBuilder.getPeer();
}
}
@@ -166,14 +201,18 @@ public class SpanDecorator implements StandardBuilder {
if (isOrigin) {
toBuilder();
}
- spanBuilder.setPeer(peer);
+ if (isV2) {
+ spanBuilderV2.setPeer(peer);
+ } else {
+ spanBuilder.setPeer(peer);
+ }
}
public int getOperationNameId() {
if (isOrigin) {
- return spanObject.getOperationNameId();
+ return isV2 ? spanObjectV2.getOperationNameId() : spanObject.getOperationNameId();
} else {
- return spanBuilder.getOperationNameId();
+ return isV2 ? spanBuilderV2.getOperationNameId() : spanBuilder.getOperationNameId();
}
}
@@ -181,14 +220,18 @@ public class SpanDecorator implements StandardBuilder {
if (isOrigin) {
toBuilder();
}
- spanBuilder.setOperationNameId(value);
+ if (isV2) {
+ spanBuilderV2.setOperationNameId(value);
+ } else {
+ spanBuilder.setOperationNameId(value);
+ }
}
public String getOperationName() {
if (isOrigin) {
- return spanObject.getOperationName();
+ return isV2 ? spanObjectV2.getOperationName() : spanObject.getOperationName();
} else {
- return spanBuilder.getOperationName();
+ return isV2 ? spanBuilderV2.getOperationName() : spanBuilder.getOperationName();
}
}
@@ -196,31 +239,43 @@ public class SpanDecorator implements StandardBuilder {
if (isOrigin) {
toBuilder();
}
- spanBuilder.setOperationName(value);
+ if (isV2) {
+ spanBuilderV2.setOperationName(value);
+ } else {
+ spanBuilder.setOperationName(value);
+ }
}
public boolean getIsError() {
if (isOrigin) {
- return spanObject.getIsError();
+ return isV2 ? spanObjectV2.getIsError() : spanObject.getIsError();
} else {
- return spanBuilder.getIsError();
+ return isV2 ? spanBuilderV2.getIsError() : spanBuilder.getIsError();
}
}
public int getRefsCount() {
if (isOrigin) {
- return spanObject.getRefsCount();
+ return isV2 ? spanObjectV2.getRefsCount() : spanObject.getRefsCount();
} else {
- return spanBuilder.getRefsCount();
+ return isV2 ? spanBuilderV2.getRefsCount() : spanBuilder.getRefsCount();
}
}
public ReferenceDecorator getRefs(int index) {
if (isNull(referenceDecorators[index])) {
if (isOrigin) {
- referenceDecorators[index] = new ReferenceDecorator(spanObject.getRefs(index), this);
+ if (isV2) {
+ referenceDecorators[index] = new ReferenceDecorator(spanObjectV2.getRefs(index), this);
+ } else {
+ referenceDecorators[index] = new ReferenceDecorator(spanObject.getRefs(index), this);
+ }
} else {
- referenceDecorators[index] = new ReferenceDecorator(spanBuilder.getRefsBuilder(index), this);
+ if (isV2) {
+ referenceDecorators[index] = new ReferenceDecorator(spanBuilderV2.getRefsBuilder(index), this);
+ } else {
+ referenceDecorators[index] = new ReferenceDecorator(spanBuilder.getRefsBuilder(index), this);
+ }
}
}
return referenceDecorators[index];
@@ -229,7 +284,11 @@ public class SpanDecorator implements StandardBuilder {
@Override public void toBuilder() {
if (this.isOrigin) {
this.isOrigin = false;
- spanBuilder = spanObject.toBuilder();
+ if (isV2) {
+ spanBuilderV2 = spanObjectV2.toBuilder();
+ } else {
+ spanBuilder = spanObject.toBuilder();
+ }
standardBuilder.toBuilder();
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
index 4d8e65e..0b0996b 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
@@ -74,7 +74,7 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
ReferenceDecorator reference = spanDecorator.getRefs(i);
SourceBuilder sourceBuilder = new SourceBuilder();
- sourceBuilder.setSourceEndpointId(reference.getParentServiceId());
+ sourceBuilder.setSourceEndpointId(reference.getParentEndpointId());
if (spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) {
int serviceIdByPeerId = serviceInventoryCache.getServiceId(reference.getNetworkAddressId());
@@ -82,12 +82,12 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
sourceBuilder.setSourceServiceInstanceId(instanceIdByPeerId);
sourceBuilder.setSourceServiceId(serviceIdByPeerId);
} else {
- sourceBuilder.setSourceServiceInstanceId(reference.getParentApplicationInstanceId());
- sourceBuilder.setSourceServiceId(instanceInventoryCache.get(reference.getParentApplicationInstanceId()).getServiceId());
+ sourceBuilder.setSourceServiceInstanceId(reference.getParentServiceInstanceId());
+ sourceBuilder.setSourceServiceId(instanceInventoryCache.get(reference.getParentServiceInstanceId()).getServiceId());
}
sourceBuilder.setDestEndpointId(spanDecorator.getOperationNameId());
- sourceBuilder.setDestServiceInstanceId(segmentCoreInfo.getApplicationInstanceId());
- sourceBuilder.setDestServiceId(segmentCoreInfo.getApplicationId());
+ sourceBuilder.setDestServiceInstanceId(segmentCoreInfo.getServiceInstanceId());
+ sourceBuilder.setDestServiceId(segmentCoreInfo.getServiceId());
sourceBuilder.setDetectPoint(DetectPoint.SERVER);
sourceBuilder.setComponentId(spanDecorator.getComponentId());
setPublicAttrs(sourceBuilder, spanDecorator);
@@ -99,8 +99,8 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
sourceBuilder.setSourceServiceInstanceId(Const.USER_INSTANCE_ID);
sourceBuilder.setSourceServiceId(Const.USER_SERVICE_ID);
sourceBuilder.setDestEndpointId(spanDecorator.getOperationNameId());
- sourceBuilder.setDestServiceInstanceId(segmentCoreInfo.getApplicationInstanceId());
- sourceBuilder.setDestServiceId(segmentCoreInfo.getApplicationId());
+ sourceBuilder.setDestServiceInstanceId(segmentCoreInfo.getServiceInstanceId());
+ sourceBuilder.setDestServiceId(segmentCoreInfo.getServiceId());
sourceBuilder.setDetectPoint(DetectPoint.SERVER);
sourceBuilder.setComponentId(spanDecorator.getComponentId());
@@ -126,8 +126,8 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
int destInstanceId = instanceInventoryCache.getServiceInstanceId(destServiceId, peerId);
sourceBuilder.setSourceEndpointId(Const.USER_ENDPOINT_ID);
- sourceBuilder.setSourceServiceInstanceId(segmentCoreInfo.getApplicationInstanceId());
- sourceBuilder.setSourceServiceId(segmentCoreInfo.getApplicationId());
+ sourceBuilder.setSourceServiceInstanceId(segmentCoreInfo.getServiceInstanceId());
+ sourceBuilder.setSourceServiceId(segmentCoreInfo.getServiceId());
sourceBuilder.setDestEndpointId(spanDecorator.getOperationNameId());
sourceBuilder.setDestServiceInstanceId(destInstanceId);
if (Const.NONE == mappingServiceId) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
index 08bd305..f7e8111 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
@@ -55,13 +55,17 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime());
segment.setSegmentId(segmentCoreInfo.getSegmentId());
- segment.setServiceId(segmentCoreInfo.getApplicationId());
+ segment.setServiceId(segmentCoreInfo.getServiceId());
segment.setLatency((int)(segmentCoreInfo.getEndTime() - segmentCoreInfo.getStartTime()));
segment.setStartTime(segmentCoreInfo.getStartTime());
segment.setEndTime(segmentCoreInfo.getEndTime());
segment.setIsError(BooleanUtils.booleanToValue(segmentCoreInfo.isError()));
segment.setTimeBucket(timeBucket);
segment.setDataBinary(segmentCoreInfo.getDataBinary());
+ /**
+ * Only consider v1, v2 compatible for now.
+ */
+ segment.setVersion(segmentCoreInfo.isV2() ? 2 : 1);
firstEndpointId = spanDecorator.getOperationNameId();
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
index d67d2a2..9e14954 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java
@@ -59,10 +59,10 @@ public class ServiceMappingSpanListener implements EntrySpanListener {
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
int serviceId = serviceInventoryCache.getServiceId(spanDecorator.getRefs(i).getNetworkAddressId());
int mappingServiceId = serviceInventoryCache.get(serviceId).getMappingServiceId();
- if (mappingServiceId != segmentCoreInfo.getApplicationId()) {
+ if (mappingServiceId != segmentCoreInfo.getServiceId()) {
ServiceMapping serviceMapping = new ServiceMapping();
serviceMapping.setServiceId(serviceId);
- serviceMapping.setMappingServiceId(segmentCoreInfo.getApplicationId());
+ serviceMapping.setMappingServiceId(segmentCoreInfo.getServiceId());
serviceMappings.add(serviceMapping);
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
index 7169ea3..8aaad19 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
@@ -53,37 +53,37 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
}
@Override public boolean exchange(ReferenceDecorator standardBuilder, int serviceId) {
- if (standardBuilder.getEntryServiceId() == 0) {
- String entryEndpointName = StringUtils.isNotEmpty(standardBuilder.getEntryServiceName()) ? standardBuilder.getEntryServiceName() : Const.DOMAIN_OPERATION_NAME;
- int entryEndpointId = endpointInventoryRegister.get(serviceInstanceInventoryCache.get(standardBuilder.getEntryApplicationInstanceId()).getServiceId(), entryEndpointName, DetectPoint.SERVER.ordinal());
+ if (standardBuilder.getEntryEndpointId() == 0) {
+ String entryEndpointName = StringUtils.isNotEmpty(standardBuilder.getEntryEndpointName()) ? standardBuilder.getEntryEndpointName() : Const.DOMAIN_OPERATION_NAME;
+ int entryEndpointId = endpointInventoryRegister.get(serviceInstanceInventoryCache.get(standardBuilder.getEntryServiceInstanceId()).getServiceId(), entryEndpointName, DetectPoint.SERVER.ordinal());
if (entryEndpointId == 0) {
if (logger.isDebugEnabled()) {
- int entryServiceId = serviceInstanceInventoryCache.get(standardBuilder.getEntryApplicationInstanceId()).getServiceId();
+ int entryServiceId = serviceInstanceInventoryCache.get(standardBuilder.getEntryServiceInstanceId()).getServiceId();
logger.debug("entry endpoint name: {} from service id: {} exchange failed", entryEndpointName, entryServiceId);
}
return false;
} else {
standardBuilder.toBuilder();
- standardBuilder.setEntryServiceId(entryEndpointId);
- standardBuilder.setEntryServiceName(Const.EMPTY_STRING);
+ standardBuilder.setEntryEndpointId(entryEndpointId);
+ standardBuilder.setEntryEndpointName(Const.EMPTY_STRING);
}
}
- if (standardBuilder.getParentServiceId() == 0) {
- String parentEndpointName = StringUtils.isNotEmpty(standardBuilder.getParentServiceName()) ? standardBuilder.getParentServiceName() : Const.DOMAIN_OPERATION_NAME;
- int parentEndpointId = endpointInventoryRegister.get(serviceInstanceInventoryCache.get(standardBuilder.getParentApplicationInstanceId()).getServiceId(), parentEndpointName, DetectPoint.SERVER.ordinal());
+ if (standardBuilder.getParentEndpointId() == 0) {
+ String parentEndpointName = StringUtils.isNotEmpty(standardBuilder.getParentEndpointName()) ? standardBuilder.getParentEndpointName() : Const.DOMAIN_OPERATION_NAME;
+ int parentEndpointId = endpointInventoryRegister.get(serviceInstanceInventoryCache.get(standardBuilder.getParentServiceInstanceId()).getServiceId(), parentEndpointName, DetectPoint.SERVER.ordinal());
if (parentEndpointId == 0) {
if (logger.isDebugEnabled()) {
- int parentServiceId = serviceInstanceInventoryCache.get(standardBuilder.getParentApplicationInstanceId()).getServiceId();
+ int parentServiceId = serviceInstanceInventoryCache.get(standardBuilder.getParentServiceInstanceId()).getServiceId();
logger.debug("parent endpoint name: {} from service id: {} exchange failed", parentEndpointName, parentServiceId);
}
return false;
} else {
standardBuilder.toBuilder();
- standardBuilder.setParentServiceId(parentEndpointId);
- standardBuilder.setParentServiceName(Const.EMPTY_STRING);
+ standardBuilder.setParentEndpointId(parentEndpointId);
+ standardBuilder.setParentEndpointName(Const.EMPTY_STRING);
}
}