You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/10/17 11:13:10 UTC

[GitHub] wu-sheng closed pull request #1784: Fixed the segment parse bug. Each use requires the creation of a new …

wu-sheng closed pull request #1784: Fixed the segment parse bug. Each use requires the creation of a new …
URL: https://github.com/apache/incubator-skywalking/pull/1784
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index c7ea0f1dc..c5b831aa8 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -67,12 +67,12 @@ public TraceModuleProvider() {
         GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
         JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).getService(JettyHandlerRegister.class);
         try {
-            SegmentParse segmentParse = new SegmentParse(getManager(), listenerManager);
-            grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentParse));
-            jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentParse));
+            SegmentParse.Producer segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
+            grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentProducer));
+            jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer));
 
-            SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentParse, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
-            segmentParse.setStandardizationWorker(standardizationWorker);
+            SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentProducer, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
+            segmentProducer.setStandardizationWorker(standardizationWorker);
         } catch (IOException e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/grpc/TraceSegmentServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/grpc/TraceSegmentServiceHandler.java
index cbc2d1a6c..cd4baa073 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/grpc/TraceSegmentServiceHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/grpc/TraceSegmentServiceHandler.java
@@ -32,11 +32,11 @@
     private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
 
     private final Boolean debug;
-    private final SegmentParse segmentParse;
+    private final SegmentParse.Producer segmentProducer;
 
-    public TraceSegmentServiceHandler(SegmentParse segmentParse) {
+    public TraceSegmentServiceHandler(SegmentParse.Producer segmentProducer) {
         this.debug = System.getProperty("debug") != null;
-        this.segmentParse = segmentParse;
+        this.segmentProducer = segmentProducer;
     }
 
     @Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
@@ -46,7 +46,7 @@ public TraceSegmentServiceHandler(SegmentParse segmentParse) {
                     logger.debug("receive segment");
                 }
 
-                segmentParse.parse(segment, SegmentParse.Source.Agent);
+                segmentProducer.send(segment, SegmentParse.Source.Agent);
 
                 if (debug) {
                     long count = SegmentCounter.INSTANCE.incrementAndGet();
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/rest/TraceSegmentServletHandler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/rest/TraceSegmentServletHandler.java
index 0493c0cee..6a3ecb959 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/rest/TraceSegmentServletHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v5/rest/TraceSegmentServletHandler.java
@@ -34,10 +34,10 @@
 
     private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class);
 
-    private final SegmentParse segmentParse;
+    private final SegmentParse.Producer segmentProducer;
 
-    public TraceSegmentServletHandler(SegmentParse segmentParse) {
-        this.segmentParse = segmentParse;
+    public TraceSegmentServletHandler(SegmentParse.Producer segmentProducer) {
+        this.segmentProducer = segmentProducer;
     }
 
     @Override public String pathSpec() {
@@ -71,7 +71,7 @@ private void read(BufferedReader bufferedReader) throws IOException {
         reader.beginArray();
         while (reader.hasNext()) {
             TraceSegment traceSegment = jsonReader.read(reader);
-            segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
+            segmentProducer.send(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
         }
         reader.endArray();
     }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index d07c4a5a9..4f3e48af3 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -33,7 +33,7 @@
 /**
  * @author peng-yongsheng
  */
-public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment> {
+public class SegmentParse {
 
     private static final Logger logger = LoggerFactory.getLogger(SegmentParse.class);
 
@@ -43,7 +43,7 @@
     private final SegmentCoreInfo segmentCoreInfo;
     @Setter private SegmentStandardizationWorker standardizationWorker;
 
-    public SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+    private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
         this.moduleManager = moduleManager;
         this.listenerManager = listenerManager;
         this.spanListeners = new LinkedList<>();
@@ -52,10 +52,6 @@ public SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager li
         this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
     }
 
-    @Override public boolean call(UpstreamSegment segment) {
-        return parse(segment, Source.Buffer);
-    }
-
     public boolean parse(UpstreamSegment segment, Source source) {
         createSpanListeners();
 
@@ -220,4 +216,28 @@ private void createSpanListeners() {
     public enum Source {
         Agent, Buffer
     }
+
+    public static class Producer implements DataStreamReader.CallBack<UpstreamSegment> {
+
+        @Setter private SegmentStandardizationWorker standardizationWorker;
+        private final ModuleManager moduleManager;
+        private final SegmentParserListenerManager listenerManager;
+
+        public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
+            this.moduleManager = moduleManager;
+            this.listenerManager = listenerManager;
+        }
+
+        public void send(UpstreamSegment segment, Source source) {
+            SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+            segmentParse.setStandardizationWorker(standardizationWorker);
+            segmentParse.parse(segment, source);
+        }
+
+        @Override public boolean call(UpstreamSegment segment) {
+            SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
+            segmentParse.setStandardizationWorker(standardizationWorker);
+            return segmentParse.parse(segment, Source.Buffer);
+        }
+    }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
index 49c92b2dd..5cf3c0b93 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
@@ -84,7 +84,7 @@ public void parseFirst(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreI
 
     @Override public void build() {
         if (logger.isDebugEnabled()) {
-            logger.debug("segment duration listener build");
+            logger.debug("segment listener build, segment id: {}", segment.getSegmentId());
         }
 
         if (entryEndpointId == 0) {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index a30c7a46d..3adc4c8a6 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -19,7 +19,7 @@
 package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.List;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
@@ -37,7 +37,7 @@
 
     private final DataCarrier<SegmentStandardization> dataCarrier;
 
-    public SegmentStandardizationWorker(SegmentParse segmentParse, String path,
+    public SegmentStandardizationWorker(SegmentParse.Producer segmentParseCreator, String path,
         int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) throws IOException {
         super(Integer.MAX_VALUE);
 
@@ -46,7 +46,7 @@ public SegmentStandardizationWorker(SegmentParse segmentParse, String path,
         builder.dataFileMaxSize(dataFileMaxSize);
         builder.offsetFileMaxSize(offsetFileMaxSize);
         builder.parser(UpstreamSegment.parser());
-        builder.callBack(segmentParse);
+        builder.callBack(segmentParseCreator);
 
         BufferStream<UpstreamSegment> stream = builder.build();
         stream.initialize();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services