You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/10/08 15:17:17 UTC

[GitHub] wu-sheng closed pull request #1733: Trace buffer test success.

wu-sheng closed pull request #1733: Trace buffer test success.
URL: https://github.com/apache/incubator-skywalking/pull/1733
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
index 7929436cf..f286a8315 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
@@ -68,7 +68,7 @@ public synchronized void write(AbstractMessageLite messageLite) {
     }
 
     private void tryLock(File directory) {
-        logger.info("Try to lock buffer directory, directory is: " + absolutePath);
+        logger.info("Try to lock buffer directory, directory is: " + directory.getAbsolutePath());
         FileLock lock = null;
 
         try {
@@ -78,10 +78,10 @@ private void tryLock(File directory) {
         }
 
         if (lock == null) {
-            throw new RuntimeException("The buffer directory is reading or writing by another thread, directory is: " + absolutePath);
+            throw new RuntimeException("The buffer directory is reading or writing by another thread, directory is: " + directory.getAbsolutePath());
         }
 
-        logger.info("Lock buffer directory successfully, directory is: " + absolutePath);
+        logger.info("Lock buffer directory successfully, directory is: " + directory.getAbsolutePath());
     }
 
     public static class Builder<MESSAGE_TYPE extends GeneratedMessageV3> {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
index d696a3615..a30c7a46d 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java
@@ -35,13 +35,11 @@
 
     private static final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class);
 
-    private final BufferStream<UpstreamSegment> stream;
+    private final DataCarrier<SegmentStandardization> dataCarrier;
 
     public SegmentStandardizationWorker(SegmentParse segmentParse, String path,
         int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) throws IOException {
         super(Integer.MAX_VALUE);
-        DataCarrier<SegmentStandardization> dataCarrier = new DataCarrier<>(1, 1024);
-        dataCarrier.consume(new Consumer(this), 1);
 
         BufferStream.Builder<UpstreamSegment> builder = new BufferStream.Builder<>(path);
         builder.cleanWhenRestart(cleanWhenRestart);
@@ -50,21 +48,24 @@ public SegmentStandardizationWorker(SegmentParse segmentParse, String path,
         builder.parser(UpstreamSegment.parser());
         builder.callBack(segmentParse);
 
-        stream = builder.build();
+        BufferStream<UpstreamSegment> stream = builder.build();
         stream.initialize();
+
+        dataCarrier = new DataCarrier<>(1, 1024);
+        dataCarrier.consume(new Consumer(stream), 1);
     }
 
     @Override
     public void in(SegmentStandardization standardization) {
-        stream.write(standardization.getUpstreamSegment());
+        dataCarrier.produce(standardization);
     }
 
     private class Consumer implements IConsumer<SegmentStandardization> {
 
-        private final SegmentStandardizationWorker aggregator;
+        private final BufferStream<UpstreamSegment> stream;
 
-        private Consumer(SegmentStandardizationWorker aggregator) {
-            this.aggregator = aggregator;
+        private Consumer(BufferStream<UpstreamSegment> stream) {
+            this.stream = stream;
         }
 
         @Override
@@ -73,16 +74,8 @@ public void init() {
 
         @Override
         public void consume(List<SegmentStandardization> data) {
-            Iterator<SegmentStandardization> inputIterator = data.iterator();
-
-            int i = 0;
-            while (inputIterator.hasNext()) {
-                SegmentStandardization indicator = inputIterator.next();
-                i++;
-                if (i == data.size()) {
-                    indicator.getEndOfBatchContext().setEndOfBatch(true);
-                }
-                aggregator.in(indicator);
+            for (SegmentStandardization aData : data) {
+                stream.write(aData.getUpstreamSegment());
             }
         }
 
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
index d19be0ee8..a96e56b50 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
@@ -52,6 +52,14 @@ public static void main(String[] args) throws InterruptedException {
         UniqueId.Builder providerSegmentId = UniqueIdBuilder.INSTANCE.create();
         providerMock.mock(streamObserver, globalTraceId, providerSegmentId, consumerSegmentId, startTimestamp, true);
 
+        TimeUnit.SECONDS.sleep(10);
+
+        globalTraceId = UniqueIdBuilder.INSTANCE.create();
+        consumerSegmentId = UniqueIdBuilder.INSTANCE.create();
+        providerSegmentId = UniqueIdBuilder.INSTANCE.create();
+        consumerMock.mock(streamObserver, globalTraceId, consumerSegmentId, startTimestamp, false);
+        providerMock.mock(streamObserver, globalTraceId, providerSegmentId, consumerSegmentId, startTimestamp, false);
+
         streamObserver.onCompleted();
         while (!IS_COMPLETED) {
             TimeUnit.MILLISECONDS.sleep(500);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services