You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/10/17 11:13:13 UTC

[incubator-skywalking] branch master updated: Fixed the segment parse bug. Each use requires the creation of a new instance. (#1784)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 064595e  Fixed the segment parse bug. Each use requires the creation of a new instance. (#1784)
064595e is described below

commit 064595eb8122662f9f7bd2d6c90c9c61ded75535
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Wed Oct 17 19:13:08 2018 +0800

    Fixed the segment parse bug. Each use requires the creation of a new instance. (#1784)
---
 .../trace/provider/TraceModuleProvider.java        | 10 +++----
 .../v5/grpc/TraceSegmentServiceHandler.java        |  8 +++---
 .../v5/rest/TraceSegmentServletHandler.java        |  8 +++---
 .../trace/provider/parser/SegmentParse.java        | 32 ++++++++++++++++++----
 .../listener/segment/SegmentSpanListener.java      |  2 +-
 .../SegmentStandardizationWorker.java              |  6 ++--
 6 files changed, 43 insertions(+), 23 deletions(-)

diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index c7ea0f1..c5b831a 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -67,12 +67,12 @@ public class TraceModuleProvider extends ModuleProvider {
         GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
         JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).getService(JettyHandlerRegister.class);
         try {
-            SegmentParse segmentParse = new SegmentParse(getManager(), listenerManager);
-            grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentParse));
-            jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentParse));
+            SegmentParse.Producer segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
+            grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentProducer));
+            jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer));
 
-            SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentParse, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
-            segmentParse.setStandardizationWorker(standardizationWorker);
+            SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentProducer, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
+            segmentProducer.setStandardizationWorker(standardizationWorker);
         } catch (IOException e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/grpc/TraceSegmentServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/grpc/TraceSegmentServiceHandler.java
index cbc2d1a..cd4baa0 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/grpc/TraceSegmentServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/grpc/TraceSegmentServiceHandler.java
@@ -32,11 +32,11 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
     private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
 
     private final Boolean debug;
-    private final SegmentParse segmentParse;
+    private final SegmentParse.Producer segmentProducer;
 
-    public TraceSegmentServiceHandler(SegmentParse segmentParse) {
+    public TraceSegmentServiceHandler(SegmentParse.Producer segmentProducer) {
         this.debug = System.getProperty("debug") != null;
-        this.segmentParse = segmentParse;
+        this.segmentProducer = segmentProducer;
     }
 
     @Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
@@ -46,7 +46,7 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
                     logger.debug("receive segment");
                 }
 
-                segmentParse.parse(segment, SegmentParse.Source.Agent);
+                segmentProducer.send(segment, SegmentParse.Source.Agent);
 
                 if (debug) {
                     long count = SegmentCounter.INSTANCE.incrementAndGet();
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/rest/TraceSegmentServletHandler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/rest/TraceSegmentServletHandler.java
index 0493c0c..6a3ecb9 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/rest/TraceSegmentServletHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/rest/TraceSegmentServletHandler.java
@@ -34,10 +34,10 @@ public class TraceSegmentServletHandler extends JettyJsonHandler {
 
     private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class);
 
-    private final SegmentParse segmentParse;
+    private final SegmentParse.Producer segmentProducer;
 
-    public TraceSegmentServletHandler(SegmentParse segmentParse) {
-        this.segmentParse = segmentParse;
+    public TraceSegmentServletHandler(SegmentParse.Producer segmentProducer) {
+        this.segmentProducer = segmentProducer;
     }
 
     @Override public String pathSpec() {
@@ -71,7 +71,7 @@ public class TraceSegmentServletHandler extends JettyJsonHandler {
         reader.beginArray();
         while (reader.hasNext()) {
             TraceSegment traceSegment = jsonReader.read(reader);
-            segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
+            segmentProducer.send(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
         }
         reader.endArray();
     }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index d07c4a5..4f3e48a 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -33,7 +33,7 @@ import org.slf4j.*;
 /**
  * @author peng-yongsheng
  */
-public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment> {
+public class SegmentParse {
 
     private static final Logger logger = LoggerFactory.getLogger(SegmentParse.class);
 
@@ -43,7 +43,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
     private final SegmentCoreInfo segmentCoreInfo;
     @Setter private SegmentStandardizationWorker standardizationWorker;
 
-    public SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+    private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
         this.moduleManager = moduleManager;
         this.listenerManager = listenerManager;
         this.spanListeners = new LinkedList<>();
@@ -52,10 +52,6 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
         this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
     }
 
-    @Override public boolean call(UpstreamSegment segment) {
-        return parse(segment, Source.Buffer);
-    }
-
     public boolean parse(UpstreamSegment segment, Source source) {
         createSpanListeners();
 
@@ -220,4 +216,28 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
     public enum Source {
         Agent, Buffer
     }
+
+    public static class Producer implements DataStreamReader.CallBack<UpstreamSegment> {
+
+        @Setter private SegmentStandardizationWorker standardizationWorker;
+        private final ModuleManager moduleManager;
+        private final SegmentParserListenerManager listenerManager;
+
+        public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+            this.moduleManager = moduleManager;
+            this.listenerManager = listenerManager;
+        }
+
+        public void send(UpstreamSegment segment, Source source) {
+            SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+            segmentParse.setStandardizationWorker(standardizationWorker);
+            segmentParse.parse(segment, source);
+        }
+
+        @Override public boolean call(UpstreamSegment segment) {
+            SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+            segmentParse.setStandardizationWorker(standardizationWorker);
+            return segmentParse.parse(segment, Source.Buffer);
+        }
+    }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
index 49c92b2..5cf3c0b 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
@@ -84,7 +84,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
 
     @Override public void build() {
         if (logger.isDebugEnabled()) {
-            logger.debug("segment duration listener build");
+            logger.debug("segment listener build, segment id: {}", segment.getSegmentId());
         }
 
         if (entryEndpointId == 0) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index a30c7a4..3adc4c8 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -19,7 +19,7 @@
 package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.List;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
@@ -37,7 +37,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
 
     private final DataCarrier<SegmentStandardization> dataCarrier;
 
-    public SegmentStandardizationWorker(SegmentParse segmentParse, String path,
+    public SegmentStandardizationWorker(SegmentParse.Producer segmentParseCreator, String path,
         int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) throws IOException {
         super(Integer.MAX_VALUE);
 
@@ -46,7 +46,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
         builder.dataFileMaxSize(dataFileMaxSize);
         builder.offsetFileMaxSize(offsetFileMaxSize);
         builder.parser(UpstreamSegment.parser());
-        builder.callBack(segmentParse);
+        builder.callBack(segmentParseCreator);
 
         BufferStream<UpstreamSegment> stream = builder.build();
         stream.initialize();