You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/11/25 08:49:39 UTC

[incubator-skywalking] branch master updated: 1. Close the output stream when switch to write next file. (#1958)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 983f761  1. Close the output stream when switch to write next file. (#1958)
983f761 is described below

commit 983f7613b6f88c01ef730cad26bd78efb3a40d92
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Sun Nov 25 16:49:35 2018 +0800

    1. Close the output stream when switch to write next file. (#1958)
    
    2. Close the input stream when switch to read next file.
    
    #1665
---
 .../oap/server/library/buffer/BufferFileUtils.java |  3 ++
 .../server/library/buffer/DataStreamReader.java    |  7 +++-
 .../server/library/buffer/DataStreamWriter.java    |  1 +
 .../library/buffer/BufferFileUtilsTestCase.java}   | 46 +++++++++++-----------
 .../library-buffer/src/test/resources/log4j2.xml   |  2 +-
 .../server/receiver/trace/mock/AgentDataMock.java  | 16 ++++----
 6 files changed, 42 insertions(+), 33 deletions(-)

diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
index d39709b..e27d688 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
@@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.library.buffer;
 import java.util.Arrays;
 
 /**
+ * This class is a util for sort or build file name for the gRPC streaming data.
+ * Sort the files by the created time in order to read the data file sequential.
+ *
  * @author peng-yongsheng
  */
 class BufferFileUtils {
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
index 79b2c8e..b8cf17b 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.library.buffer;
 
 import com.google.protobuf.*;
 import java.io.*;
+import java.util.Objects;
 import java.util.concurrent.*;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.PrefixFileFilter;
@@ -78,8 +79,12 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
     private void openInputStream(File readingFile) {
         try {
             this.readingFile = readingFile;
+            if (Objects.nonNull(inputStream)) {
+                inputStream.close();
+            }
+
             inputStream = new FileInputStream(readingFile);
-        } catch (FileNotFoundException e) {
+        } catch (IOException e) {
             logger.error(e.getMessage(), e);
         }
     }
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
index cb1a0c6..d4d0c87 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
@@ -88,6 +88,7 @@ class DataStreamWriter<MESSAGE_TYPE extends GeneratedMessageV3> {
             writeOffset.setOffset(position);
             if (position >= (FileUtils.ONE_MB * dataFileMaxSize)) {
                 File writingFile = createNewFile();
+                outputStream.close();
                 outputStream = FileUtils.openOutputStream(writingFile, true);
             }
         } catch (IOException e) {
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtilsTestCase.java
similarity index 53%
copy from oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
copy to oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtilsTestCase.java
index d39709b..3eb69b1 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
+++ b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtilsTestCase.java
@@ -18,32 +18,30 @@
 
 package org.apache.skywalking.oap.server.library.buffer;
 
-import java.util.Arrays;
+import java.util.*;
+import org.junit.*;
 
 /**
  * @author peng-yongsheng
  */
-class BufferFileUtils {
-
-    private BufferFileUtils() {
-    }
-
-    static final String CHARSET = "UTF-8";
-    static final String DATA_FILE_PREFIX = "data";
-    static final String OFFSET_FILE_PREFIX = "offset";
-    private static final String SEPARATOR = "-";
-    private static final String SUFFIX = ".sw";
-
-    static void sort(String[] fileList) {
-        Arrays.sort(fileList, (f1, f2) -> {
-            long t1 = Long.parseLong(f1.substring(0, f1.length() - 3).split(SEPARATOR)[1]);
-            long t2 = Long.parseLong(f2.substring(0, f2.length() - 3).split(SEPARATOR)[1]);
-
-            return (int)(t1 - t2);
-        });
-    }
-
-    static String buildFileName(String prefix) {
-        return prefix + SEPARATOR + System.currentTimeMillis() + SUFFIX;
+public class BufferFileUtilsTestCase {
+
+    @Test
+    public void testSort() {
+        List<String> fileNames = new ArrayList<>();
+        fileNames.add("data-1.sw");
+        fileNames.add("data-3.sw");
+        fileNames.add("data-2.sw");
+        fileNames.add("data-8.sw");
+        fileNames.add("data-5.sw");
+
+        String[] files = fileNames.toArray(new String[0]);
+        BufferFileUtils.sort(files);
+
+        Assert.assertEquals("data-1.sw", files[0]);
+        Assert.assertEquals("data-2.sw", files[1]);
+        Assert.assertEquals("data-3.sw", files[2]);
+        Assert.assertEquals("data-5.sw", files[3]);
+        Assert.assertEquals("data-8.sw", files[4]);
     }
-}
\ No newline at end of file
+}
diff --git a/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml b/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml
index 6eb5b3f..4151264 100644
--- a/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml
+++ b/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml
@@ -20,7 +20,7 @@
 <Configuration status="DEBUG">
     <Appenders>
         <Console name="Console" target="SYSTEM_OUT">
-            <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
+            <PatternLayout charset="UTF-8" pattern="%d - %c - %L [%t] %-5p %x - %m%n"/>
         </Console>
     </Appenders>
     <Loggers>
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
index d2b7f03..924d1e2 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
@@ -63,13 +63,15 @@ public class AgentDataMock {
 
         TimeUnit.SECONDS.sleep(10);
 
-        globalTraceId = UniqueIdBuilder.INSTANCE.create();
-        serviceASegmentId = UniqueIdBuilder.INSTANCE.create();
-        serviceBSegmentId = UniqueIdBuilder.INSTANCE.create();
-        serviceCSegmentId = UniqueIdBuilder.INSTANCE.create();
-        serviceAMock.mock(streamObserver, globalTraceId, serviceASegmentId, startTimestamp, false);
-        serviceBMock.mock(streamObserver, globalTraceId, serviceBSegmentId, serviceASegmentId, startTimestamp, false);
-        serviceCMock.mock(streamObserver, globalTraceId, serviceCSegmentId, serviceBSegmentId, startTimestamp, false);
+        for (int i = 0; i < 500; i++) {
+            globalTraceId = UniqueIdBuilder.INSTANCE.create();
+            serviceASegmentId = UniqueIdBuilder.INSTANCE.create();
+            serviceBSegmentId = UniqueIdBuilder.INSTANCE.create();
+            serviceCSegmentId = UniqueIdBuilder.INSTANCE.create();
+            serviceAMock.mock(streamObserver, globalTraceId, serviceASegmentId, startTimestamp, false);
+            serviceBMock.mock(streamObserver, globalTraceId, serviceBSegmentId, serviceASegmentId, startTimestamp, false);
+            serviceCMock.mock(streamObserver, globalTraceId, serviceCSegmentId, serviceBSegmentId, startTimestamp, false);
+        }
 
         streamObserver.onCompleted();
         while (!IS_COMPLETED) {