You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/10/17 11:13:13 UTC
[incubator-skywalking] branch master updated: Fixed the segment
parse bug. Each use requires the creation of a new instance. (#1784)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 064595e Fixed the segment parse bug. Each use requires the creation of a new instance. (#1784)
064595e is described below
commit 064595eb8122662f9f7bd2d6c90c9c61ded75535
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Wed Oct 17 19:13:08 2018 +0800
Fixed the segment parse bug. Each use requires the creation of a new instance. (#1784)
---
.../trace/provider/TraceModuleProvider.java | 10 +++----
.../v5/grpc/TraceSegmentServiceHandler.java | 8 +++---
.../v5/rest/TraceSegmentServletHandler.java | 8 +++---
.../trace/provider/parser/SegmentParse.java | 32 ++++++++++++++++++----
.../listener/segment/SegmentSpanListener.java | 2 +-
.../SegmentStandardizationWorker.java | 6 ++--
6 files changed, 43 insertions(+), 23 deletions(-)
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index c7ea0f1..c5b831a 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -67,12 +67,12 @@ public class TraceModuleProvider extends ModuleProvider {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).getService(JettyHandlerRegister.class);
try {
- SegmentParse segmentParse = new SegmentParse(getManager(), listenerManager);
- grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentParse));
- jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentParse));
+ SegmentParse.Producer segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
+ grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentProducer));
+ jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer));
- SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentParse, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
- segmentParse.setStandardizationWorker(standardizationWorker);
+ SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentProducer, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
+ segmentProducer.setStandardizationWorker(standardizationWorker);
} catch (IOException e) {
throw new ModuleStartException(e.getMessage(), e);
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/grpc/TraceSegmentServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/grpc/TraceSegmentServiceHandler.java
index cbc2d1a..cd4baa0 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/grpc/TraceSegmentServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/grpc/TraceSegmentServiceHandler.java
@@ -32,11 +32,11 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
private final Boolean debug;
- private final SegmentParse segmentParse;
+ private final SegmentParse.Producer segmentProducer;
- public TraceSegmentServiceHandler(SegmentParse segmentParse) {
+ public TraceSegmentServiceHandler(SegmentParse.Producer segmentProducer) {
this.debug = System.getProperty("debug") != null;
- this.segmentParse = segmentParse;
+ this.segmentProducer = segmentProducer;
}
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
@@ -46,7 +46,7 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
logger.debug("receive segment");
}
- segmentParse.parse(segment, SegmentParse.Source.Agent);
+ segmentProducer.send(segment, SegmentParse.Source.Agent);
if (debug) {
long count = SegmentCounter.INSTANCE.incrementAndGet();
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/rest/TraceSegmentServletHandler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/rest/TraceSegmentServletHandler.java
index 0493c0c..6a3ecb9 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/rest/TraceSegmentServletHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/rest/TraceSegmentServletHandler.java
@@ -34,10 +34,10 @@ public class TraceSegmentServletHandler extends JettyJsonHandler {
private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class);
- private final SegmentParse segmentParse;
+ private final SegmentParse.Producer segmentProducer;
- public TraceSegmentServletHandler(SegmentParse segmentParse) {
- this.segmentParse = segmentParse;
+ public TraceSegmentServletHandler(SegmentParse.Producer segmentProducer) {
+ this.segmentProducer = segmentProducer;
}
@Override public String pathSpec() {
@@ -71,7 +71,7 @@ public class TraceSegmentServletHandler extends JettyJsonHandler {
reader.beginArray();
while (reader.hasNext()) {
TraceSegment traceSegment = jsonReader.read(reader);
- segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
+ segmentProducer.send(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
}
reader.endArray();
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index d07c4a5..4f3e48a 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -33,7 +33,7 @@ import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment> {
+public class SegmentParse {
private static final Logger logger = LoggerFactory.getLogger(SegmentParse.class);
@@ -43,7 +43,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
private final SegmentCoreInfo segmentCoreInfo;
@Setter private SegmentStandardizationWorker standardizationWorker;
- public SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+ private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.spanListeners = new LinkedList<>();
@@ -52,10 +52,6 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
}
- @Override public boolean call(UpstreamSegment segment) {
- return parse(segment, Source.Buffer);
- }
-
public boolean parse(UpstreamSegment segment, Source source) {
createSpanListeners();
@@ -220,4 +216,28 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
public enum Source {
Agent, Buffer
}
+
+ public static class Producer implements DataStreamReader.CallBack<UpstreamSegment> {
+
+ @Setter private SegmentStandardizationWorker standardizationWorker;
+ private final ModuleManager moduleManager;
+ private final SegmentParserListenerManager listenerManager;
+
+ public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+ this.moduleManager = moduleManager;
+ this.listenerManager = listenerManager;
+ }
+
+ public void send(UpstreamSegment segment, Source source) {
+ SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+ segmentParse.setStandardizationWorker(standardizationWorker);
+ segmentParse.parse(segment, source);
+ }
+
+ @Override public boolean call(UpstreamSegment segment) {
+ SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+ segmentParse.setStandardizationWorker(standardizationWorker);
+ return segmentParse.parse(segment, Source.Buffer);
+ }
+ }
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
index 49c92b2..5cf3c0b 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
@@ -84,7 +84,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
@Override public void build() {
if (logger.isDebugEnabled()) {
- logger.debug("segment duration listener build");
+ logger.debug("segment listener build, segment id: {}", segment.getSegmentId());
}
if (entryEndpointId == 0) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index a30c7a4..3adc4c8 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization;
import java.io.IOException;
-import java.util.*;
+import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
@@ -37,7 +37,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
private final DataCarrier<SegmentStandardization> dataCarrier;
- public SegmentStandardizationWorker(SegmentParse segmentParse, String path,
+ public SegmentStandardizationWorker(SegmentParse.Producer segmentParseCreator, String path,
int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) throws IOException {
super(Integer.MAX_VALUE);
@@ -46,7 +46,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
builder.dataFileMaxSize(dataFileMaxSize);
builder.offsetFileMaxSize(offsetFileMaxSize);
builder.parser(UpstreamSegment.parser());
- builder.callBack(segmentParse);
+ builder.callBack(segmentParseCreator);
BufferStream<UpstreamSegment> stream = builder.build();
stream.initialize();