You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/11/25 08:49:39 UTC
[incubator-skywalking] branch master updated: 1. Close the output
stream when switch to write next file. (#1958)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 983f761 1. Close the output stream when switch to write next file. (#1958)
983f761 is described below
commit 983f7613b6f88c01ef730cad26bd78efb3a40d92
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Sun Nov 25 16:49:35 2018 +0800
1. Close the output stream when switch to write next file. (#1958)
2. Close the input stream when switch to read next file.
#1665
---
.../oap/server/library/buffer/BufferFileUtils.java | 3 ++
.../server/library/buffer/DataStreamReader.java | 7 +++-
.../server/library/buffer/DataStreamWriter.java | 1 +
.../library/buffer/BufferFileUtilsTestCase.java} | 46 +++++++++++-----------
.../library-buffer/src/test/resources/log4j2.xml | 2 +-
.../server/receiver/trace/mock/AgentDataMock.java | 16 ++++----
6 files changed, 42 insertions(+), 33 deletions(-)
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
index d39709b..e27d688 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
@@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.library.buffer;
import java.util.Arrays;
/**
+ * This class is a util for sort or build file name for the gRPC streaming data.
+ * Sort the files by the created time in order to read the data file sequential.
+ *
* @author peng-yongsheng
*/
class BufferFileUtils {
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
index 79b2c8e..b8cf17b 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.library.buffer;
import com.google.protobuf.*;
import java.io.*;
+import java.util.Objects;
import java.util.concurrent.*;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.PrefixFileFilter;
@@ -78,8 +79,12 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
private void openInputStream(File readingFile) {
try {
this.readingFile = readingFile;
+ if (Objects.nonNull(inputStream)) {
+ inputStream.close();
+ }
+
inputStream = new FileInputStream(readingFile);
- } catch (FileNotFoundException e) {
+ } catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
index cb1a0c6..d4d0c87 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
@@ -88,6 +88,7 @@ class DataStreamWriter<MESSAGE_TYPE extends GeneratedMessageV3> {
writeOffset.setOffset(position);
if (position >= (FileUtils.ONE_MB * dataFileMaxSize)) {
File writingFile = createNewFile();
+ outputStream.close();
outputStream = FileUtils.openOutputStream(writingFile, true);
}
} catch (IOException e) {
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtilsTestCase.java
similarity index 53%
copy from oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
copy to oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtilsTestCase.java
index d39709b..3eb69b1 100644
--- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
+++ b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtilsTestCase.java
@@ -18,32 +18,30 @@
package org.apache.skywalking.oap.server.library.buffer;
-import java.util.Arrays;
+import java.util.*;
+import org.junit.*;
/**
* @author peng-yongsheng
*/
-class BufferFileUtils {
-
- private BufferFileUtils() {
- }
-
- static final String CHARSET = "UTF-8";
- static final String DATA_FILE_PREFIX = "data";
- static final String OFFSET_FILE_PREFIX = "offset";
- private static final String SEPARATOR = "-";
- private static final String SUFFIX = ".sw";
-
- static void sort(String[] fileList) {
- Arrays.sort(fileList, (f1, f2) -> {
- long t1 = Long.parseLong(f1.substring(0, f1.length() - 3).split(SEPARATOR)[1]);
- long t2 = Long.parseLong(f2.substring(0, f2.length() - 3).split(SEPARATOR)[1]);
-
- return (int)(t1 - t2);
- });
- }
-
- static String buildFileName(String prefix) {
- return prefix + SEPARATOR + System.currentTimeMillis() + SUFFIX;
+public class BufferFileUtilsTestCase {
+
+ @Test
+ public void testSort() {
+ List<String> fileNames = new ArrayList<>();
+ fileNames.add("data-1.sw");
+ fileNames.add("data-3.sw");
+ fileNames.add("data-2.sw");
+ fileNames.add("data-8.sw");
+ fileNames.add("data-5.sw");
+
+ String[] files = fileNames.toArray(new String[0]);
+ BufferFileUtils.sort(files);
+
+ Assert.assertEquals("data-1.sw", files[0]);
+ Assert.assertEquals("data-2.sw", files[1]);
+ Assert.assertEquals("data-3.sw", files[2]);
+ Assert.assertEquals("data-5.sw", files[3]);
+ Assert.assertEquals("data-8.sw", files[4]);
}
-}
\ No newline at end of file
+}
diff --git a/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml b/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml
index 6eb5b3f..4151264 100644
--- a/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml
+++ b/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml
@@ -20,7 +20,7 @@
<Configuration status="DEBUG">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
- <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
+ <PatternLayout charset="UTF-8" pattern="%d - %c - %L [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
index d2b7f03..924d1e2 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java
@@ -63,13 +63,15 @@ public class AgentDataMock {
TimeUnit.SECONDS.sleep(10);
- globalTraceId = UniqueIdBuilder.INSTANCE.create();
- serviceASegmentId = UniqueIdBuilder.INSTANCE.create();
- serviceBSegmentId = UniqueIdBuilder.INSTANCE.create();
- serviceCSegmentId = UniqueIdBuilder.INSTANCE.create();
- serviceAMock.mock(streamObserver, globalTraceId, serviceASegmentId, startTimestamp, false);
- serviceBMock.mock(streamObserver, globalTraceId, serviceBSegmentId, serviceASegmentId, startTimestamp, false);
- serviceCMock.mock(streamObserver, globalTraceId, serviceCSegmentId, serviceBSegmentId, startTimestamp, false);
+ for (int i = 0; i < 500; i++) {
+ globalTraceId = UniqueIdBuilder.INSTANCE.create();
+ serviceASegmentId = UniqueIdBuilder.INSTANCE.create();
+ serviceBSegmentId = UniqueIdBuilder.INSTANCE.create();
+ serviceCSegmentId = UniqueIdBuilder.INSTANCE.create();
+ serviceAMock.mock(streamObserver, globalTraceId, serviceASegmentId, startTimestamp, false);
+ serviceBMock.mock(streamObserver, globalTraceId, serviceBSegmentId, serviceASegmentId, startTimestamp, false);
+ serviceCMock.mock(streamObserver, globalTraceId, serviceCSegmentId, serviceBSegmentId, startTimestamp, false);
+ }
streamObserver.onCompleted();
while (!IS_COMPLETED) {