You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/08/30 04:55:06 UTC

[GitHub] wu-sheng closed pull request #1607: Buffer file implementation

wu-sheng closed pull request #1607: Buffer file implementation
URL: https://github.com/apache/incubator-skywalking/pull/1607
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/apm-commons/apm-util/src/main/java/org/apache/skywalking/apm/util/StringUtil.java b/apm-commons/apm-util/src/main/java/org/apache/skywalking/apm/util/StringUtil.java
index e1ca37830..4c27745e4 100644
--- a/apm-commons/apm-util/src/main/java/org/apache/skywalking/apm/util/StringUtil.java
+++ b/apm-commons/apm-util/src/main/java/org/apache/skywalking/apm/util/StringUtil.java
@@ -16,7 +16,6 @@
  *
  */
 
-
 package org.apache.skywalking.apm.util;
 
 public final class StringUtil {
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 2efac8ad9..56a6836fc 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -55,6 +55,7 @@
         <h2.version>1.4.196</h2.version>
         <shardingjdbc.version>2.0.3</shardingjdbc.version>
         <commons-dbcp.version>1.4</commons-dbcp.version>
+        <commons-io.version>2.6</commons-io.version>
         <elasticsearch.version>6.3.2</elasticsearch.version>
         <joda-time.version>2.9.9</joda-time.version>
         <kubernetes.version>2.0.0</kubernetes.version>
@@ -249,6 +250,11 @@
                 <artifactId>commons-dbcp</artifactId>
                 <version>${commons-dbcp.version}</version>
             </dependency>
+            <dependency>
+                <groupId>commons-io</groupId>
+                <artifactId>commons-io</artifactId>
+                <version>${commons-io.version}</version>
+            </dependency>
             <dependency>
                 <groupId>io.kubernetes</groupId>
                 <artifactId>client-java</artifactId>
diff --git a/oap-server/server-library/library-buffer/pom.xml b/oap-server/server-library/library-buffer/pom.xml
new file mode 100644
index 000000000..8a87b1e41
--- /dev/null
+++ b/oap-server/server-library/library-buffer/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>server-library</artifactId>
+        <groupId>org.apache.skywalking</groupId>
+        <version>6.0.0-alpha-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>library-buffer</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>apm-network</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
new file mode 100644
index 000000000..98570050e
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import java.text.*;
+import java.util.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class BufferFileUtils {
+
+    private BufferFileUtils() {
+    }
+
+    static final String CHARSET = "UTF-8";
+    static final String DATA_FILE_PREFIX = "data";
+    static final String OFFSET_FILE_PREFIX = "offset";
+    private static final String SEPARATOR = "-";
+    private static final String SUFFIX = ".sw";
+    private static final String DATA_FORMAT_STR = "yyyyMMddHHmmss";
+
+    static void sort(String[] fileList) {
+        Arrays.sort(fileList, (f1, f2) -> {
+            int fileId1 = Integer.parseInt(f1.split("_")[1]);
+            int fileId2 = Integer.parseInt(f2.split("_")[1]);
+
+            return fileId1 - fileId2;
+        });
+    }
+
+    static String buildFileName(String prefix) {
+        DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT_STR);
+        return prefix + SEPARATOR + dateFormat.format(new Date()) + SUFFIX;
+    }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
new file mode 100644
index 000000000..7929436cf
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import com.google.protobuf.*;
+import java.io.*;
+import java.nio.channels.FileLock;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class BufferStream<MESSAGE_TYPE extends GeneratedMessageV3> {
+
+    private static final Logger logger = LoggerFactory.getLogger(BufferStream.class);
+
+    private final String absolutePath;
+    private final boolean cleanWhenRestart;
+    private final int dataFileMaxSize;
+    private final int offsetFileMaxSize;
+    private final Parser<MESSAGE_TYPE> parser;
+    private final DataStreamReader.CallBack<MESSAGE_TYPE> callBack;
+    private DataStream<MESSAGE_TYPE> dataStream;
+
+    private BufferStream(String absolutePath, boolean cleanWhenRestart, int dataFileMaxSize, int offsetFileMaxSize,
+        Parser<MESSAGE_TYPE> parser, DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
+        this.absolutePath = absolutePath;
+        this.cleanWhenRestart = cleanWhenRestart;
+        this.dataFileMaxSize = dataFileMaxSize;
+        this.offsetFileMaxSize = offsetFileMaxSize;
+        this.parser = parser;
+        this.callBack = callBack;
+    }
+
+    public synchronized void initialize() throws IOException {
+        File directory = new File(absolutePath);
+        FileUtils.forceMkdir(directory);
+        tryLock(directory);
+
+        dataStream = new DataStream<>(directory, dataFileMaxSize, offsetFileMaxSize, parser, callBack);
+
+        if (cleanWhenRestart) {
+            dataStream.clean();
+        }
+
+        dataStream.initialize();
+    }
+
+    public synchronized void write(AbstractMessageLite messageLite) {
+        dataStream.getWriter().write(messageLite);
+    }
+
+    private void tryLock(File directory) {
+        logger.info("Try to lock buffer directory, directory is: " + absolutePath);
+        FileLock lock = null;
+
+        try {
+            lock = new FileOutputStream(new File(directory, "lock")).getChannel().tryLock();
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+
+        if (lock == null) {
+            throw new RuntimeException("The buffer directory is reading or writing by another thread, directory is: " + absolutePath);
+        }
+
+        logger.info("Lock buffer directory successfully, directory is: " + absolutePath);
+    }
+
+    public static class Builder<MESSAGE_TYPE extends GeneratedMessageV3> {
+
+        private final String absolutePath;
+        private boolean cleanWhenRestart;
+        private int dataFileMaxSize;
+        private int offsetFileMaxSize;
+        private Parser<MESSAGE_TYPE> parser;
+        private DataStreamReader.CallBack<MESSAGE_TYPE> callBack;
+
+        public Builder(String absolutePath) {
+            this.absolutePath = absolutePath;
+        }
+
+        public BufferStream<MESSAGE_TYPE> build() {
+            return new BufferStream<>(absolutePath, cleanWhenRestart, dataFileMaxSize, offsetFileMaxSize, parser, callBack);
+        }
+
+        public Builder<MESSAGE_TYPE> cleanWhenRestart(boolean cleanWhenRestart) {
+            this.cleanWhenRestart = cleanWhenRestart;
+            return this;
+        }
+
+        public Builder<MESSAGE_TYPE> offsetFileMaxSize(int offsetFileMaxSize) {
+            this.offsetFileMaxSize = offsetFileMaxSize;
+            return this;
+        }
+
+        public Builder<MESSAGE_TYPE> dataFileMaxSize(int dataFileMaxSize) {
+            this.dataFileMaxSize = dataFileMaxSize;
+            return this;
+        }
+
+        public Builder<MESSAGE_TYPE> parser(Parser<MESSAGE_TYPE> parser) {
+            this.parser = parser;
+            return this;
+        }
+
+        public Builder<MESSAGE_TYPE> callBack(DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
+            this.callBack = callBack;
+            return this;
+        }
+    }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java
new file mode 100644
index 000000000..1bb380d66
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import com.google.protobuf.*;
+import java.io.*;
+import lombok.Getter;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.PrefixFileFilter;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class DataStream<MESSAGE_TYPE extends GeneratedMessageV3> {
+
+    private static final Logger logger = LoggerFactory.getLogger(DataStream.class);
+
+    private final File directory;
+    private final OffsetStream offsetStream;
+    @Getter private final DataStreamReader<MESSAGE_TYPE> reader;
+    @Getter private final DataStreamWriter<MESSAGE_TYPE> writer;
+    private boolean initialized = false;
+
+    DataStream(File directory, int offsetFileMaxSize, int dataFileMaxSize, Parser<MESSAGE_TYPE> parser,
+        DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
+        this.directory = directory;
+        this.offsetStream = new OffsetStream(directory, offsetFileMaxSize);
+        this.writer = new DataStreamWriter<>(directory, offsetStream.getOffset().getWriteOffset(), dataFileMaxSize);
+        this.reader = new DataStreamReader<>(directory, offsetStream.getOffset().getReadOffset(), parser, callBack);
+    }
+
+    void clean() throws IOException {
+        String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.DATA_FILE_PREFIX));
+        if (fileNames != null) {
+            for (String fileName : fileNames) {
+                File file = new File(directory, fileName);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Delete buffer data file: {}", file.getAbsolutePath());
+                }
+                FileUtils.forceDelete(file);
+            }
+        }
+
+        offsetStream.clean();
+    }
+
+    synchronized void initialize() throws IOException {
+        if (!initialized) {
+            offsetStream.initialize();
+            writer.initialize();
+            reader.initialize();
+            initialized = true;
+        }
+    }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
new file mode 100644
index 000000000..16b0cd9f9
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import com.google.protobuf.*;
+import java.io.*;
+import java.util.concurrent.*;
+import org.apache.commons.io.filefilter.PrefixFileFilter;
+import org.apache.skywalking.apm.util.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
+
+    private static final Logger logger = LoggerFactory.getLogger(DataStreamReader.class);
+
+    private final File directory;
+    private final Offset.ReadOffset readOffset;
+    private final Parser<MESSAGE_TYPE> parser;
+    private final CallBack<MESSAGE_TYPE> callBack;
+    private InputStream inputStream;
+
+    DataStreamReader(File directory, Offset.ReadOffset readOffset, Parser<MESSAGE_TYPE> parser,
+        CallBack<MESSAGE_TYPE> callBack) {
+        this.directory = directory;
+        this.readOffset = readOffset;
+        this.parser = parser;
+        this.callBack = callBack;
+    }
+
+    void initialize() {
+        preRead();
+
+        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+            new RunnableWithExceptionProtection(this::read,
+                t -> logger.error("Segment buffer pre read failure.", t)), 3, 3, TimeUnit.SECONDS);
+    }
+
+    private void preRead() {
+        String fileName = readOffset.getFileName();
+        if (StringUtil.isEmpty(fileName)) {
+            openInputStream(readEarliestCreateDataFile());
+        } else {
+            File dataFile = new File(directory, fileName);
+            if (dataFile.exists()) {
+                openInputStream(dataFile);
+                read();
+            } else {
+                openInputStream(readEarliestCreateDataFile());
+            }
+        }
+    }
+
+    private void openInputStream(File readFile) {
+        try {
+            inputStream = new FileInputStream(readFile);
+        } catch (FileNotFoundException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private File readEarliestCreateDataFile() {
+        String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.DATA_FILE_PREFIX));
+
+        if (fileNames != null && fileNames.length > 0) {
+            BufferFileUtils.sort(fileNames);
+            readOffset.setFileName(fileNames[0]);
+            readOffset.setOffset(0);
+            return new File(directory, fileNames[0]);
+        } else {
+            return null;
+        }
+    }
+
+    private void read() {
+        try {
+            MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream);
+            if (messageType != null) {
+                callBack.call(messageType);
+                final int serialized = messageType.getSerializedSize();
+                final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
+                readOffset.setOffset(readOffset.getOffset() + offset);
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    interface CallBack<MESSAGE_TYPE extends GeneratedMessageV3> {
+        void call(MESSAGE_TYPE message);
+    }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
new file mode 100644
index 000000000..13a88b1a2
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import com.google.protobuf.*;
+import java.io.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class DataStreamWriter<MESSAGE_TYPE extends GeneratedMessageV3> {
+
+    private static final Logger logger = LoggerFactory.getLogger(DataStreamWriter.class);
+
+    private final File directory;
+    private final Offset.WriteOffset writeOffset;
+
+    private final int dataFileMaxSize;
+
+    private boolean initialized = false;
+    private FileOutputStream outputStream;
+
+    DataStreamWriter(File directory, Offset.WriteOffset writeOffset, int dataFileMaxSize) {
+        this.directory = directory;
+        this.dataFileMaxSize = dataFileMaxSize;
+        this.writeOffset = writeOffset;
+    }
+
+    synchronized void initialize() throws IOException {
+        if (!initialized) {
+            String writeFileName = writeOffset.getFileName();
+
+            File dataFile;
+            if (StringUtil.isEmpty(writeFileName)) {
+                dataFile = createNewFile();
+            } else {
+                dataFile = new File(directory, writeFileName);
+                if (!dataFile.exists()) {
+                    dataFile = createNewFile();
+                }
+            }
+
+            outputStream = FileUtils.openOutputStream(dataFile, true);
+            initialized = true;
+        }
+    }
+
+    private File createNewFile() throws IOException {
+        String fileName = BufferFileUtils.buildFileName(BufferFileUtils.DATA_FILE_PREFIX);
+        File dataFile = new File(directory, fileName);
+
+        boolean created = dataFile.createNewFile();
+        if (!created) {
+            logger.info("The file named {} already exists.", dataFile.getAbsolutePath());
+        } else {
+            logger.info("Create a new buffer data file: {}", dataFile.getAbsolutePath());
+        }
+
+        writeOffset.setOffset(0);
+        writeOffset.setFileName(dataFile.getName());
+
+        return dataFile;
+    }
+
+    void write(AbstractMessageLite messageLite) {
+        try {
+            messageLite.writeDelimitedTo(outputStream);
+            long position = outputStream.getChannel().position();
+            writeOffset.setOffset(position);
+            if (position > (FileUtils.ONE_MB * dataFileMaxSize)) {
+                File dataFile = createNewFile();
+                outputStream = FileUtils.openOutputStream(dataFile, true);
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java
new file mode 100644
index 000000000..09e5936e1
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import lombok.*;
+import org.apache.skywalking.apm.util.StringUtil;
+
+/**
+ * @author peng-yongsheng
+ */
+class Offset {
+
+    private static final String SPLIT_CHARACTER = ",";
+    @Getter private final ReadOffset readOffset;
+    @Getter private final WriteOffset writeOffset;
+
+    Offset() {
+        readOffset = new ReadOffset();
+        writeOffset = new WriteOffset();
+    }
+
+    String serialize() {
+        return readOffset.getFileName() + SPLIT_CHARACTER + String.valueOf(readOffset.getOffset())
+            + SPLIT_CHARACTER + writeOffset.getFileName() + SPLIT_CHARACTER + String.valueOf(writeOffset.getOffset());
+    }
+
+    void deserialize(String value) {
+        if (!StringUtil.isEmpty(value)) {
+            String[] values = value.split(SPLIT_CHARACTER);
+            if (values.length == 4) {
+                readOffset.setFileName(values[0]);
+                readOffset.setOffset(Long.parseLong(values[1]));
+                writeOffset.setFileName(values[2]);
+                writeOffset.setOffset(Long.parseLong(values[3]));
+            }
+        }
+    }
+
+    static class ReadOffset {
+        @Getter @Setter private String fileName;
+        @Getter @Setter private long offset = 0;
+    }
+
+    static class WriteOffset {
+        @Getter @Setter private String fileName;
+        @Getter @Setter private long offset = 0;
+    }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
new file mode 100644
index 000000000..7f4633141
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.util.concurrent.*;
+import lombok.Getter;
+import org.apache.commons.io.*;
+import org.apache.commons.io.filefilter.PrefixFileFilter;
+import org.apache.commons.io.input.ReversedLinesFileReader;
+import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class OffsetStream {
+
+    private static final Logger logger = LoggerFactory.getLogger(OffsetStream.class);
+
+    private final File directory;
+    private final int offsetFileMaxSize;
+
+    @Getter private final Offset offset;
+    private File offsetFile;
+    private boolean initialized = false;
+    private String lastOffsetRecord = "";
+
+    OffsetStream(File directory, int offsetFileMaxSize) {
+        this.directory = directory;
+        this.offsetFileMaxSize = offsetFileMaxSize;
+        this.offset = new Offset();
+    }
+
+    void clean() throws IOException {
+        String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.OFFSET_FILE_PREFIX));
+        if (fileNames != null) {
+            for (String fileName : fileNames) {
+                File file = new File(directory, fileName);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Delete buffer offset file: {}", file.getAbsolutePath());
+                }
+                FileUtils.forceDelete(new File(directory, fileName));
+            }
+        }
+    }
+
+    synchronized void initialize() throws IOException {
+        if (!initialized) {
+            String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.OFFSET_FILE_PREFIX));
+            if (fileNames != null && fileNames.length > 0) {
+                for (int i = 0; i < fileNames.length; i++) {
+                }
+            } else {
+                offsetFile = newFile();
+            }
+            offset.deserialize(readLastLine());
+            initialized = true;
+
+            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+                new RunnableWithExceptionProtection(this::flush,
+                    t -> logger.error("Flush offset file in background failure.", t)
+                ), 2, 1, TimeUnit.SECONDS);
+        }
+    }
+
+    void flush() {
+        try {
+            String offsetRecord = offset.serialize();
+            logger.debug("flush offset, record: {}", offsetRecord);
+            if (!lastOffsetRecord.equals(offsetRecord)) {
+                if (offsetFile.length() >= FileUtils.ONE_MB * offsetFileMaxSize) {
+                    nextFile();
+                }
+
+                try (OutputStream out = new BufferedOutputStream(FileUtils.openOutputStream(offsetFile, true))) {
+                    IOUtils.write(offsetRecord, out, Charset.forName(BufferFileUtils.CHARSET));
+                    IOUtils.write(System.lineSeparator(), out, Charset.forName(BufferFileUtils.CHARSET));
+                }
+                lastOffsetRecord = offsetRecord;
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    private void nextFile() throws IOException {
+        File newOffsetFile = newFile();
+        if (!offsetFile.delete()) {
+            logger.warn("Offset file {} delete failure.", newOffsetFile.getAbsolutePath());
+        }
+        offsetFile = newOffsetFile;
+        this.flush();
+    }
+
+    private File newFile() throws IOException {
+        String fileName = BufferFileUtils.buildFileName(BufferFileUtils.OFFSET_FILE_PREFIX);
+        File file = new File(directory, fileName);
+        if (file.createNewFile()) {
+            logger.info("Create a new offset file {}", fileName);
+        }
+        return file;
+    }
+
+    private String readLastLine() throws IOException {
+        ReversedLinesFileReader reader = new ReversedLinesFileReader(offsetFile, Charset.forName(BufferFileUtils.CHARSET));
+        return reader.readLine();
+    }
+}
diff --git a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
new file mode 100644
index 000000000..cff910085
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.apm.network.language.agent.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class BufferStreamTestCase {
+
+    private static final Logger logger = LoggerFactory.getLogger(BufferStreamTestCase.class);
+
+    public static void main(String[] args) throws IOException, InterruptedException {
+        String directory = "/Users/pengys5/code/sky-walking/buffer-test";
+        BufferStream.Builder<TraceSegmentObject> builder = new BufferStream.Builder<>(directory);
+        builder.cleanWhenRestart(true);
+        builder.dataFileMaxSize(1);
+        builder.offsetFileMaxSize(1);
+        builder.parser(TraceSegmentObject.parser());
+        builder.callBack(new SegmentParse());
+
+        BufferStream<TraceSegmentObject> stream = builder.build();
+        stream.initialize();
+
+        TimeUnit.SECONDS.sleep(5);
+
+        String str = "2018-08-27 11:59:45,261 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" +
+            "main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" +
+            "main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" +
+            "main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28";
+
+        for (int i = 0; i < 100; i++) {
+            TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder();
+            SpanObject.Builder span = SpanObject.newBuilder();
+
+            span.setOperationName(String.valueOf(i) + "  " + str);
+            segment.addSpans(span);
+            stream.write(segment.build());
+        }
+
+    }
+
+    private static class SegmentParse implements DataStreamReader.CallBack<TraceSegmentObject> {
+
+        @Override public void call(TraceSegmentObject message) {
+            logger.info("segment parse: {}", message.getSpans(0).getOperationName());
+        }
+    }
+}
diff --git a/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml b/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml
new file mode 100644
index 000000000..6eb5b3fb9
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<Configuration status="DEBUG">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="DEBUG">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileUtils.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileUtils.java
new file mode 100644
index 000000000..9776730a1
--- /dev/null
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util;
+
+import java.io.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public enum FileUtils {
+    INSTANCE;
+
+    private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
+
+    public String readLastLine(File file) {
+        RandomAccessFile randomAccessFile = null;
+        try {
+            randomAccessFile = new RandomAccessFile(file, "r");
+            long length = randomAccessFile.length();
+            if (length == 0) {
+                return "";
+            } else {
+                long position = length - 1;
+                randomAccessFile.seek(position);
+                while (position >= 0) {
+                    if (randomAccessFile.read() == '\n') {
+                        return randomAccessFile.readLine();
+                    }
+                    randomAccessFile.seek(position);
+                    if (position == 0) {
+                        return randomAccessFile.readLine();
+                    }
+                    position--;
+                }
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (randomAccessFile != null) {
+                try {
+                    randomAccessFile.close();
+                } catch (IOException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        }
+        return "";
+    }
+
+    public void writeAppendToLast(File file, RandomAccessFile randomAccessFile, String value) {
+        if (randomAccessFile == null) {
+            try {
+                randomAccessFile = new RandomAccessFile(file, "rwd");
+            } catch (FileNotFoundException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+        try {
+            long length = randomAccessFile.length();
+            randomAccessFile.seek(length);
+            randomAccessFile.writeBytes(System.lineSeparator());
+            randomAccessFile.writeBytes(value);
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+}
diff --git a/oap-server/server-library/pom.xml b/oap-server/server-library/pom.xml
index 5df8a5a0f..ac79fb6d1 100644
--- a/oap-server/server-library/pom.xml
+++ b/oap-server/server-library/pom.xml
@@ -34,5 +34,13 @@
         <module>library-server</module>
         <module>library-util</module>
         <module>library-client</module>
+        <module>library-buffer</module>
     </modules>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>apm-util</artifactId>
+        </dependency>
+    </dependencies>
 </project>
\ No newline at end of file
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index 9d0ddd0d4..75a318fba 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -79,6 +79,11 @@
             <artifactId>skywalking-jvm-receiver-plugin</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>skywalking-trace-receiver-plugin</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!-- receiver module -->
 
         <!-- storage module -->


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services