You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2020/12/16 07:54:26 UTC
[skywalking-satellite] 01/01: mmap-queue-plugin
This is an automated email from the ASF dual-hosted git repository.
liujiapeng pushed a commit to branch mmap-queue
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
commit f2dfcfe1cf6a4881bf5a9d3537cb3661fbe5ee6b
Author: Evan <ev...@outlook.com>
AuthorDate: Wed Dec 16 15:53:59 2020 +0800
mmap-queue-plugin
---
Makefile | 5 +-
dist/LICENSE | 3 +-
dist/licenses/LICENSE-go-cmp | 27 ++
dist/licenses/LICENSE-mmap | 21 ++
docs/configuration/queue.md | 11 +
docs/plugins/queue/mmap/README.md | 66 ++++
go.mod | 3 +-
go.sum | 18 +-
internal/pkg/event/event.go | 53 +--
internal/satellite/event/event.go | 109 ------
.../satellite/module/gatherer/fetcher_gatherer.go | 26 +-
.../satellite/module/gatherer/receiver_gatherer.go | 25 +-
internal/satellite/module/processor/processor.go | 2 +-
internal/satellite/module/sender/sender.go | 4 +-
plugins/fetcher/api/fetcher.go | 2 +-
plugins/init.go | 2 +-
plugins/parser/api/parser.go | 4 +-
plugins/queue/api/queue.go | 17 +-
plugins/queue/mmap/README.md | 1 -
plugins/queue/mmap/branchmark_test.go | 109 ++++++
plugins/queue/mmap/meta/meta.go | 153 ++++++++
plugins/queue/mmap/meta/meta_test.go | 208 +++++++++++
plugins/queue/mmap/queue.go | 192 ++++++++++
plugins/queue/mmap/queue_opreation.go | 236 ++++++++++++
plugins/queue/mmap/queue_test.go | 402 +++++++++++++++++++++
plugins/queue/mmap/segment/segment.go | 68 ++++
plugins/queue/mmap/segment/segment_test.go | 187 ++++++++++
plugins/queue/mmap/segment_operation.go | 142 ++++++++
plugins/queue/mmap/serializer.go | 74 ++++
plugins/queue/{api => }/queue_repository.go | 18 +-
plugins/receiver/api/receiver.go | 2 +-
31 files changed, 2004 insertions(+), 186 deletions(-)
diff --git a/Makefile b/Makefile
index dbbc0e8..9ff7285 100644
--- a/Makefile
+++ b/Makefile
@@ -25,6 +25,7 @@ RELEASE_SRC = skywalking-satellite-$(VERSION)-src
OS = $(shell uname)
GO = go
+GIT = git
GO_PATH = $$($(GO) env GOPATH)
GO_BUILD = $(GO) build
GO_GET = $(GO) get
@@ -36,7 +37,7 @@ GO_BUILD_FLAGS = -v
GO_BUILD_LDFLAGS = -X main.version=$(VERSION)
GQL_GEN = $(GO_PATH)/bin/gqlgen
-PLATFORMS := windows linux darwin
+PLATFORMS := linux darwin
os = $(word 1, $@)
ARCH = amd64
@@ -72,7 +73,7 @@ clean: tools
-rm -rf coverage.txt
.PHONY: build
-build: deps windows linux darwin
+build: deps linux darwin
.PHONY: $(PLATFORMS)
diff --git a/dist/LICENSE b/dist/LICENSE
index 618fe55..38f0d7a 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -222,7 +222,7 @@ BSD licenses
The following components are provided under a BSD license. See project link for details.
The text of each license is also included at licenses/LICENSE-[project].txt.
-
+ google (go-cmp) v0.5.4 https://github.com/google/go-cmp BSD
========================================================================
MIT licenses
@@ -234,3 +234,4 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
sirupsen (logrus) v1.7.0: https://github.com/sirupsen/logrus MIT
spf13 (viper) v1.7.1: https://github.com/spf13/viper MIT
urfave (cli) v2.3.0: https://github.com/urfave/cli MIT
+ grandecola (mmap) v0.6.0: https://github.com/grandecola/mmap MIT
diff --git a/dist/licenses/LICENSE-go-cmp b/dist/licenses/LICENSE-go-cmp
new file mode 100644
index 0000000..32017f8
--- /dev/null
+++ b/dist/licenses/LICENSE-go-cmp
@@ -0,0 +1,27 @@
+Copyright (c) 2017 The Go Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/dist/licenses/LICENSE-mmap b/dist/licenses/LICENSE-mmap
new file mode 100644
index 0000000..be28ae0
--- /dev/null
+++ b/dist/licenses/LICENSE-mmap
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Aman Mangal
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/docs/configuration/queue.md b/docs/configuration/queue.md
new file mode 100644
index 0000000..b7d81b1
--- /dev/null
+++ b/docs/configuration/queue.md
@@ -0,0 +1,11 @@
+# queue configuration
+
+| Type | Param | DefaultValue| Meaning|
+| ---- | ---- |---- | ---- |
+| mmap-queue | segment_size | 131072 | The size of each segment. The unit is Byte.
+| mmap-queue | max_in_mem_segments | 10 | The max num of segments in memory.
+| mmap-queue | queue_capacity_segments | 4000 | The capacity of Queue = segment_size * queue_capacity_segments.
+| mmap-queue | flush_period | 1000 | The period flush time. The unit is ms.
+| mmap-queue | flush_ceiling_num | 10000 | The max number in one flush time.
+| mmap-queue | queue_dir | satellite-mmap-queue |Contains all files in the queue.
+| mmap-queue | max_event_size | 20480 |The max size of the input event. The unit is Byte.
diff --git a/docs/plugins/queue/mmap/README.md b/docs/plugins/queue/mmap/README.md
new file mode 100644
index 0000000..9c8379c
--- /dev/null
+++ b/docs/plugins/queue/mmap/README.md
@@ -0,0 +1,66 @@
+# Design
+The mmap-queue is a big, fast and persistent queue based on the memory mapped files. One mmap-queue has a directory to store the whole data. The Queue directory is made up with many segments and 1 meta file.
+
+- Segment: Segment is the real data store center, that provides large-space storage and does not reduce read and write performance as much as possible by using mmap. And we will avoid deleting files by reusing them.
+- Meta: The purpose of meta is to find the data that the consumer needs.
+
+## Meta
+Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment, it takes at least one memory page size, which is generally 4K.
+```
+[ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ]
+[metaVersion][ ID ][ offset][ ID ][ offset][ ID ][ offset][ ID ][ offset][capacity]
+[metaVersion][writing offset][watermark offset][committed offset][reading offset][capacity]
+
+```
+### Transforming
+
+![](https://skywalking.apache.org/blog/2020-11-25-skywalking-satellite-0.1.0-design/offset-convert.jpg)
+
+## Configuration
+[Configuration Params](../../../configuration/queue.md)
+
+## Segment
+Segments are a series of files of the same size. Each input data would cost `8Bit+Len(data)Bit` to store the raw bytes. The first 8Bit is equal to `Len(data)` for finding the ending position.
+### Swapper
+For the performance and resources thinking, we define a page replacement policy.
+
+- Keep the reading and writing segments on the memory.
+- When the mmapcount is greater or equals to the max_in_mem_segments, we first scan the read scope and then scan the written scope to swap the segments to promise the reading or writing segments are always in memory.
+ - Read scope: [reading_offset - max_in_mem_segments,reading_offset - 1]
+ - Written scope: [writing_offset - max_in_mem_segments,writing_offset - 1]
+ - Each displacement operation guarantees at least `max_in_mem_segments/2-1` capacity available. Subtract operation to subtract the amount of memory that must always exist.
+
+## BenchmarkTest
+Base machine: macbook pro 2018 Intel Core i7 16 GB 2400 MHz DDR4 SSD
+
+### push operation
+
+```
+BenchmarkPush
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:8KB
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:8KB-12 42470 25098 ns/op 466 B/op 10 allocs/op
+
+BenchmarkPush/segmentSize:_256KB_maxInMemSegments:10_message:8KB
+BenchmarkPush/segmentSize:_256KB_maxInMemSegments:10_message:8KB-12 76905 18910 ns/op 418 B/op 10 allocs/op
+
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:20_message:8KB
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:20_message:8KB-12 58221 22258 ns/op 465 B/op 10 allocs/op
+
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:16KB
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:16KB-12 34053 48635 ns/op 566 B/op 11 allocs/op
+```
+### push and pop operation
+```
+BenchmarkPushAndPop
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:8KB
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:8KB-12 22273 45872 ns/op 19512 B/op 40 allocs/op
+
+BenchmarkPushAndPop/segmentSize:_256KB_maxInMemSegments:10_message:8KB
+BenchmarkPushAndPop/segmentSize:_256KB_maxInMemSegments:10_message:8KB-12 38874 37169 ns/op 19456 B/op 40 allocs/op
+
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:20_message:8KB
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:20_message:8KB-12 38048 36274 ns/op 19514 B/op 40 allocs/op
+
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:16KB
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:16KB-12 19768 63399 ns/op 36893 B/op 41 allocs/op
+```
diff --git a/go.mod b/go.mod
index 3bf717f..1c012a9 100644
--- a/go.mod
+++ b/go.mod
@@ -3,8 +3,9 @@ module github.com/apache/skywalking-satellite
go 1.14
require (
+ github.com/google/go-cmp v0.5.4
+ github.com/grandecola/mmap v0.6.0
github.com/sirupsen/logrus v1.7.0
github.com/spf13/viper v1.7.1
github.com/urfave/cli/v2 v2.3.0
)
-
diff --git a/go.sum b/go.sum
index e84cc08..23f05df 100644
--- a/go.sum
+++ b/go.sum
@@ -11,6 +11,7 @@ cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqCl
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
+github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
@@ -59,15 +60,22 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
+github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/grandecola/mmap v0.6.0 h1:dYrZWLay1rDlmlsGsSIoXGQ+JMu/t2ZnKt8vT1h+1o0=
+github.com/grandecola/mmap v0.6.0/go.mod h1:q5v9jpm393rcp5PXE6biArHKc2SWJBpXjfxSRtQMtNU=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
@@ -95,14 +103,17 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
@@ -150,7 +161,9 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
@@ -168,11 +181,11 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
-github.com/urfave/cli v1.22.5 h1:lNq9sAHXK2qfdI8W+GRItjCEkI+2oR4d+MEHy1CKXoU=
github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
@@ -267,6 +280,8 @@ golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
@@ -290,6 +305,7 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
diff --git a/internal/pkg/event/event.go b/internal/pkg/event/event.go
index a834fae..2ed10cf 100644
--- a/internal/pkg/event/event.go
+++ b/internal/pkg/event/event.go
@@ -39,56 +39,31 @@ type Type int32
// Offset is a generic form, which allows having different definitions in different Queues.
type Offset string
-// Event that implement this interface would be allowed to transmit in the Satellite.
-type Event interface {
- // Name returns the event name.
- Name() string
-
- // Meta is a pair of key and value to record meta data, such as labels.
- Meta() map[string]string
-
- // Data returns the wrapped data.
- Data() interface{}
-
- // Time returns the event time.
- Time() time.Time
-
- // Type is to distinguish different events.
- Type() Type
-
- // IsRemote means is a output event when returns true.
- IsRemote() bool
-}
-
-// SerializableEvent is used in Collector to bridge Queue.
-type SerializableEvent interface {
- Event
-
- // ToBytes serialize the event to a byte array.
- ToBytes() []byte
-
- // FromBytes deserialize the byte array to an event.
- FromBytes(bytes []byte) SerializableEvent
+type Event struct {
+ Name string
+ Timestamp time.Time
+ Meta map[string]string
+ Type Type
+ Remote bool
+ Data map[string]interface{}
}
// BatchEvents is used by Forwarder to forward.
-type BatchEvents []Event
+type BatchEvents []*Event
// OutputEventContext is a container to store the output context.
type OutputEventContext struct {
- Context map[string]Event
+ Context map[string]*Event
Offset Offset
}
-// Put puts the incoming event into the context when the event is a remote event.
-func (c *OutputEventContext) Put(event Event) {
- if event.IsRemote() {
- c.Context[event.Name()] = event
- }
+// Put puts the incoming event into the context.
+func (c *OutputEventContext) Put(event *Event) {
+ c.Context[event.Name] = event
}
-// Get returns a event in the context. When the eventName does not exist, a error would be returned.
-func (c *OutputEventContext) Get(eventName string) (Event, error) {
+// Get returns an event in the context. When the eventName does not exist, an error would be returned.
+func (c *OutputEventContext) Get(eventName string) (*Event, error) {
e, ok := c.Context[eventName]
if !ok {
err := fmt.Errorf("cannot find the event name in OutputEventContext : %s", eventName)
diff --git a/internal/satellite/event/event.go b/internal/satellite/event/event.go
deleted file mode 100644
index a53720b..0000000
--- a/internal/satellite/event/event.go
+++ /dev/null
@@ -1,109 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package event
-
-import (
- "time"
-
- "github.com/apache/skywalking-satellite/internal/pkg/event"
-)
-
-// Event defines the common fields.
-type Event struct {
- name string
- timestamp time.Time
- meta map[string]string
- eventType event.Type
- remote bool
-}
-
-// ETBFunc serialize event to bytes.
-type ETBFunc func(event event.SerializableEvent) []byte
-
-// BToFunc deserialize bytes to event.
-type BToFunc func(bytes []byte) event.SerializableEvent
-
-// StructuredEvent works when the data is a struct type.
-type StructuredEvent struct {
- Event
- data interface{}
-}
-
-// StructuredEvent works when the data is not a struct type.
-type UnstructuredEvent struct {
- Event
- data map[string]interface{}
-}
-
-// StructuredEvent works when the data is a struct type in the collector.
-type StructuredSerializableEvent struct {
- StructuredEvent
- etb ETBFunc
- bte BToFunc
-}
-
-// StructuredEvent works when the data is not a struct type in the collector.
-type UnstructuredSerializableEvent struct {
- UnstructuredEvent
- etb ETBFunc
- bte BToFunc
-}
-
-func (s *Event) Name() string {
- return s.name
-}
-
-func (s *Event) Meta() map[string]string {
- return s.meta
-}
-
-func (s *Event) Time() time.Time {
- return s.timestamp
-}
-
-func (s *Event) Type() event.Type {
- return s.eventType
-}
-
-func (s *Event) IsRemote() bool {
- return s.remote
-}
-
-func (u *StructuredEvent) Data() interface{} {
- return u.data
-}
-
-func (u *UnstructuredEvent) Data() interface{} {
- return u.data
-}
-
-func (s *StructuredSerializableEvent) ToBytes() []byte {
- return s.etb(s)
-}
-
-func (s *StructuredSerializableEvent) FromBytes(bytes []byte) event.SerializableEvent {
- return s.bte(bytes)
-}
-
-func (u *UnstructuredSerializableEvent) ToBytes() []byte {
- return u.etb(u)
-}
-
-func (u *UnstructuredSerializableEvent) FromBytes(bytes []byte) event.SerializableEvent {
- return u.bte(bytes)
-}
diff --git a/internal/satellite/module/gatherer/fetcher_gatherer.go b/internal/satellite/module/gatherer/fetcher_gatherer.go
index 77ac6d0..f218657 100644
--- a/internal/satellite/module/gatherer/fetcher_gatherer.go
+++ b/internal/satellite/module/gatherer/fetcher_gatherer.go
@@ -47,7 +47,7 @@ func (f *FetcherGatherer) Prepare() error {
func (f *FetcherGatherer) Boot(ctx context.Context) {
var wg sync.WaitGroup
- wg.Add(1)
+ wg.Add(2)
go func() {
defer wg.Done()
timeTicker := time.NewTicker(time.Duration(f.config.FetchInterval) * time.Millisecond)
@@ -55,21 +55,37 @@ func (f *FetcherGatherer) Boot(ctx context.Context) {
select {
case <-timeTicker.C:
events := f.runningFetcher.Fetch()
- for _, event := range events {
- err := f.runningQueue.Push(event)
+ for _, e := range events {
+ err := f.runningQueue.Push(e)
if err != nil {
// todo add abandonedCount metrics
log.Logger.Errorf("cannot put event into queue in %s namespace, %v", f.config.NamespaceName, err)
}
}
- case e := <-f.runningQueue.Pop():
- f.outputChannel <- e
case <-ctx.Done():
f.Shutdown()
return
}
}
}()
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ f.Shutdown()
+ return
+ default:
+ if e, err := f.runningQueue.Pop(); err == nil {
+ f.outputChannel <- e
+ } else {
+ log.Logger.Errorf("error in popping from the queue: %v", err)
+ time.Sleep(time.Second)
+ }
+ }
+ }
+ }()
wg.Wait()
}
diff --git a/internal/satellite/module/gatherer/receiver_gatherer.go b/internal/satellite/module/gatherer/receiver_gatherer.go
index 5735474..de812a5 100644
--- a/internal/satellite/module/gatherer/receiver_gatherer.go
+++ b/internal/satellite/module/gatherer/receiver_gatherer.go
@@ -20,6 +20,7 @@ package gatherer
import (
"context"
"sync"
+ "time"
"github.com/apache/skywalking-satellite/internal/pkg/event"
"github.com/apache/skywalking-satellite/internal/pkg/log"
@@ -45,7 +46,7 @@ type ReceiverGatherer struct {
func (r *ReceiverGatherer) Prepare() error {
log.Logger.Infof("receiver gatherer module of %s namespace is preparing", r.config.NamespaceName)
r.runningReceiver.RegisterHandler(r.runningServer)
- if err := r.runningQueue.Prepare(); err != nil {
+ if err := r.runningQueue.Initialize(); err != nil {
log.Logger.Infof("the %s queue of %s namespace was failed to initialize", r.runningQueue.Name(), r.config.NamespaceName)
return err
}
@@ -54,7 +55,7 @@ func (r *ReceiverGatherer) Prepare() error {
func (r *ReceiverGatherer) Boot(ctx context.Context) {
var wg sync.WaitGroup
- wg.Add(1)
+ wg.Add(2)
go func() {
defer wg.Done()
for {
@@ -65,14 +66,30 @@ func (r *ReceiverGatherer) Boot(ctx context.Context) {
// todo add abandonedCount metrics
log.Logger.Errorf("cannot put event into queue in %s namespace, error is: %v", r.config.NamespaceName, err)
}
- case e := <-r.runningQueue.Pop():
- r.outputChannel <- e
case <-ctx.Done():
r.Shutdown()
return
}
}
}()
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ r.Shutdown()
+ return
+ default:
+ if e, err := r.runningQueue.Pop(); err == nil {
+ r.outputChannel <- e
+ } else {
+ log.Logger.Errorf("error in popping from the queue: %v", err)
+ time.Sleep(time.Second)
+ }
+ }
+ }
+ }()
wg.Wait()
}
diff --git a/internal/satellite/module/processor/processor.go b/internal/satellite/module/processor/processor.go
index 640f605..974741a 100644
--- a/internal/satellite/module/processor/processor.go
+++ b/internal/satellite/module/processor/processor.go
@@ -60,7 +60,7 @@ func (p *Processor) Boot(ctx context.Context) {
case e := <-p.gatherer.OutputDataChannel():
c := &event.OutputEventContext{
Offset: e.Offset,
- Context: make(map[string]event.Event),
+ Context: make(map[string]*event.Event),
}
c.Put(e.Event)
// processing the event with filters, that put the necessary events to OutputEventContext.
diff --git a/internal/satellite/module/sender/sender.go b/internal/satellite/module/sender/sender.go
index 19a469b..37c8b1c 100644
--- a/internal/satellite/module/sender/sender.go
+++ b/internal/satellite/module/sender/sender.go
@@ -134,8 +134,8 @@ func (s *Sender) consume(batch *buffer.BatchBuffer) {
for i := 0; i < batch.Len(); i++ {
eventContext := batch.Buf()[i]
for _, e := range eventContext.Context {
- if e.IsRemote() {
- events[e.Type()] = append(events[e.Type()], e)
+ if e.Remote {
+ events[e.Type] = append(events[e.Type], e)
}
}
}
diff --git a/plugins/fetcher/api/fetcher.go b/plugins/fetcher/api/fetcher.go
index 4021473..83139a9 100644
--- a/plugins/fetcher/api/fetcher.go
+++ b/plugins/fetcher/api/fetcher.go
@@ -27,5 +27,5 @@ type Fetcher interface {
plugin.Plugin
// Fetch would fetch some APM data.
- Fetch() []event.SerializableEvent
+ Fetch() event.BatchEvents
}
diff --git a/plugins/init.go b/plugins/init.go
index 2bf3bf4..97f0177 100644
--- a/plugins/init.go
+++ b/plugins/init.go
@@ -24,7 +24,7 @@ import (
filter "github.com/apache/skywalking-satellite/plugins/filter/api"
forwarder "github.com/apache/skywalking-satellite/plugins/forwarder/api"
parser "github.com/apache/skywalking-satellite/plugins/parser/api"
- queue "github.com/apache/skywalking-satellite/plugins/queue/api"
+ "github.com/apache/skywalking-satellite/plugins/queue"
receiver "github.com/apache/skywalking-satellite/plugins/receiver/api"
server "github.com/apache/skywalking-satellite/plugins/server/api"
)
diff --git a/plugins/parser/api/parser.go b/plugins/parser/api/parser.go
index 754037f..3e09e3a 100644
--- a/plugins/parser/api/parser.go
+++ b/plugins/parser/api/parser.go
@@ -27,8 +27,8 @@ type Parser interface {
plugin.Plugin
// ParseBytes parse the byte buffer into events.
- ParseBytes(bytes []byte) ([]event.SerializableEvent, error)
+ ParseBytes(bytes []byte) (event.BatchEvents, error)
// ParseStr parse the string into events.
- ParseStr(str string) ([]event.SerializableEvent, error)
+ ParseStr(str string) (event.BatchEvents, error)
}
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
index f25b2ff..a5ad163 100644
--- a/plugins/queue/api/queue.go
+++ b/plugins/queue/api/queue.go
@@ -18,6 +18,8 @@
package api
import (
+ "reflect"
+
"github.com/apache/skywalking-satellite/internal/pkg/event"
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
)
@@ -26,14 +28,14 @@ import (
type Queue interface {
plugin.Plugin
- // Prepare creates the queue.
- Prepare() error
+ // Initialize creates the queue.
+ Initialize() error
// Push a inputEvent into the queue.
- Push(event event.SerializableEvent) error
+ Push(event *event.Event) error
// Pop returns a SequenceEvent when Queue is not empty,
- Pop() chan *SequenceEvent
+ Pop() (*SequenceEvent, error)
// Close would close the queue.
Close() error
@@ -44,6 +46,11 @@ type Queue interface {
// SequenceEvent is a wrapper to pass the event and the offset.
type SequenceEvent struct {
- Event event.Event
+ Event *event.Event
Offset event.Offset
}
+
+// GetQueue an initialized filter plugin.
+func GetQueue(config plugin.Config) Queue {
+ return plugin.Get(reflect.TypeOf((*Queue)(nil)).Elem(), config).(Queue)
+}
diff --git a/plugins/queue/mmap/README.md b/plugins/queue/mmap/README.md
deleted file mode 100644
index 3f03ea1..0000000
--- a/plugins/queue/mmap/README.md
+++ /dev/null
@@ -1 +0,0 @@
-# Plugin description
\ No newline at end of file
diff --git a/plugins/queue/mmap/branchmark_test.go b/plugins/queue/mmap/branchmark_test.go
new file mode 100644
index 0000000..4bf9ee4
--- /dev/null
+++ b/plugins/queue/mmap/branchmark_test.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+ "fmt"
+ "os"
+ "testing"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+type benchmarkParam struct {
+ segmentSize int
+ message int // unit KB
+ maxInMemSegments int
+}
+
+var params = []benchmarkParam{
+ {segmentSize: 1024 * 128, message: 8, maxInMemSegments: 10},
+ // compare the influence of the segmentSize.
+ {segmentSize: 1024 * 256, message: 8, maxInMemSegments: 10},
+ // compare the influence of the maxInMemSegments.
+ {segmentSize: 1024 * 128, message: 8, maxInMemSegments: 20},
+ // compare the influence of the message size.
+ {segmentSize: 1024 * 128, message: 16, maxInMemSegments: 10},
+}
+
+func cleanBenchmarkQueue(b *testing.B, q api.Queue) {
+ if err := os.RemoveAll(q.(*Queue).QueueDir); err != nil {
+ b.Errorf("cannot remove test queue dir, %v", err)
+ }
+}
+
+func BenchmarkPush(b *testing.B) {
+ for _, param := range params {
+ name := fmt.Sprintf("segmentSize: %dKB maxInMemSegments:%d message:%dKB", param.segmentSize/1024, param.maxInMemSegments, param.message)
+ b.Run(name, func(b *testing.B) {
+ q, err := initMmapQueue(plugin.Config{
+ "queue_dir": "BenchmarkPush",
+ "segment_size": param.segmentSize,
+ "max_in_mem_segments": param.maxInMemSegments,
+ "queue_capacity_segments": 10000,
+ })
+ if err != nil {
+ b.Fatalf("cannot get a mmap queue: %v", err)
+ }
+ event := getLargeEvent(param.message)
+ b.ReportAllocs()
+ b.ResetTimer()
+ println()
+ for i := 0; i < b.N; i++ {
+ if err := q.Push(event); err != nil {
+ b.Fatalf("error in pushing: %v", err)
+ }
+ }
+ b.StopTimer()
+ _ = q.Close()
+ cleanBenchmarkQueue(b, q)
+ })
+ }
+}
+
+func BenchmarkPushAndPop(b *testing.B) {
+ for _, param := range params {
+ name := fmt.Sprintf("segmentSize: %dKB maxInMemSegments:%d message:%dKB", param.segmentSize/1024, param.maxInMemSegments, param.message)
+ b.Run(name, func(b *testing.B) {
+ q, err := initMmapQueue(plugin.Config{
+ "queue_dir": "BenchmarkPushAndPop",
+ "segment_size": param.segmentSize,
+ "max_in_mem_segments": param.maxInMemSegments,
+ })
+ if err != nil {
+ b.Fatalf("cannot get a mmap queue: %v", err)
+ }
+ event := getLargeEvent(param.message)
+ b.ReportAllocs()
+ b.ResetTimer()
+ println()
+ for i := 0; i < b.N; i++ {
+ if err := q.Push(event); err != nil {
+ b.Fatalf("error in pushing: %v", err)
+ }
+ if _, err := q.Pop(); err != nil {
+ b.Fatalf("error in pushing: %v", err)
+ }
+ }
+ b.StopTimer()
+ _ = q.Close()
+ cleanBenchmarkQueue(b, q)
+ })
+ }
+}
diff --git a/plugins/queue/mmap/meta/meta.go b/plugins/queue/mmap/meta/meta.go
new file mode 100644
index 0000000..cfa32a3
--- /dev/null
+++ b/plugins/queue/mmap/meta/meta.go
@@ -0,0 +1,153 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package meta
+
+import (
+ "fmt"
+ "syscall"
+
+ "path/filepath"
+
+ "github.com/grandecola/mmap"
+
+ "github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+const (
+ metaSize = 80
+ metaName = "meta.dat"
+ metaVersion = 1
+)
+
+// Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment,
+// it takes at least one memory page size, which is generally 4K.
+//
+// [ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ]
+// [metaVersion][ ID ][ offset][ ID ][ offset][ ID ][ offset][ ID ][ offset][capacity]
+// [metaVersion][writing offset][watermark offset][committed offset][reading offset][capacity]
+type Metadata struct {
+ metaFile *mmap.File
+ name string
+ size int
+ capacity int
+}
+
+// NewMetaData read or create a Metadata with supported metaVersion
+func NewMetaData(metaDir string, capacity int) (*Metadata, error) {
+ path := filepath.Join(metaDir, metaName)
+ metaFile, err := segment.NewSegment(path, metaSize)
+ if err != nil {
+ return nil, fmt.Errorf("error in crating the Metadata memory mapped file: %v", err)
+ }
+
+ m := &Metadata{
+ metaFile: metaFile,
+ name: metaName,
+ size: metaSize,
+ capacity: capacity,
+ }
+
+ v := m.GetVersion()
+ if v != 0 && v != metaVersion {
+ return nil, fmt.Errorf("metadata metaVersion is not matching, the Metadata metaVersion is %d", v)
+ }
+ c := m.GetCapacity()
+ if c != 0 && c != capacity {
+ return nil, fmt.Errorf("metadata catapacity is not equal to the old capacity, the old capacity is %d", c)
+ }
+ m.PutVersion(metaVersion)
+ m.PutCapacity(int64(capacity))
+ return m, nil
+}
+
+// GetVersion returns the meta version.
+func (m *Metadata) GetVersion() int {
+ return int(m.metaFile.ReadUint64At(0))
+}
+
+// PutVersion put the version into the memory mapped file.
+func (m *Metadata) PutVersion(version int64) {
+ m.metaFile.WriteUint64At(uint64(version), 0)
+}
+
+// GetWritingOffset returns the writing offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetWritingOffset() (segmentID, offset int64) {
+ return int64(m.metaFile.ReadUint64At(8)), int64(m.metaFile.ReadUint64At(16))
+}
+
+// PutWritingOffset put the segment ID and the offset of the segment into the writing offset.
+func (m *Metadata) PutWritingOffset(segmentID, offset int64) {
+ m.metaFile.WriteUint64At(uint64(segmentID), 8)
+ m.metaFile.WriteUint64At(uint64(offset), 16)
+}
+
+// GetWatermarkOffset returns the watermark offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetWatermarkOffset() (segmentID, offset int64) {
+ return int64(m.metaFile.ReadUint64At(24)), int64(m.metaFile.ReadUint64At(32))
+}
+
+// PutWatermarkOffset put the segment ID and the offset of the segment into the watermark offset.
+func (m *Metadata) PutWatermarkOffset(segmentID, offset int64) {
+ m.metaFile.WriteUint64At(uint64(segmentID), 24)
+ m.metaFile.WriteUint64At(uint64(offset), 32)
+}
+
+// GetCommittedOffset returns the committed offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetCommittedOffset() (segmentID, offset int64) {
+ return int64(m.metaFile.ReadUint64At(40)), int64(m.metaFile.ReadUint64At(48))
+}
+
+// PutCommittedOffset put the segment ID and the offset of the segment into the committed offset.
+func (m *Metadata) PutCommittedOffset(segmentID, offset int64) {
+ m.metaFile.WriteUint64At(uint64(segmentID), 40)
+ m.metaFile.WriteUint64At(uint64(offset), 48)
+}
+
+// GetReadingOffset returns the reading offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetReadingOffset() (segmentID, offset int64) {
+ return int64(m.metaFile.ReadUint64At(56)), int64(m.metaFile.ReadUint64At(64))
+}
+
+// PutReadingOffset put the segment ID and the offset of the segment into the reading offset.
+func (m *Metadata) PutReadingOffset(segmentID, offset int64) {
+ m.metaFile.WriteUint64At(uint64(segmentID), 56)
+ m.metaFile.WriteUint64At(uint64(offset), 64)
+}
+
+// GetCapacity returns the capacity of the queue.
+func (m *Metadata) GetCapacity() int {
+ return int(m.metaFile.ReadUint64At(72))
+}
+
+// PutCapacity put the capacity into the memory mapped file.
+func (m *Metadata) PutCapacity(version int64) {
+ m.metaFile.WriteUint64At(uint64(version), 72)
+}
+
+// Flush the memory mapped file to the disk.
+func (m *Metadata) Flush() error {
+ return m.metaFile.Flush(syscall.MS_SYNC)
+}
+
+// Close do Flush operation and unmap the memory mapped file.
+func (m *Metadata) Close() error {
+ if err := m.Flush(); err != nil {
+ return err
+ }
+ return m.metaFile.Unmap()
+}
diff --git a/plugins/queue/mmap/meta/meta_test.go b/plugins/queue/mmap/meta/meta_test.go
new file mode 100644
index 0000000..fcc6b57
--- /dev/null
+++ b/plugins/queue/mmap/meta/meta_test.go
@@ -0,0 +1,208 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package meta
+
+import (
+ "os"
+ "reflect"
+ "testing"
+)
+
+type args struct {
+ metaVersion int
+ capacity int
+ // ID
+ writingSegID int64
+ readingSegID int64
+ committedSegID int64
+ watermarkSegID int64
+ // offset
+ writingSegOffset int64
+ readingSegOffset int64
+ committedSegOffset int64
+ watermarkSegOffset int64
+}
+
+type tests struct {
+ name string
+ args args
+ want args
+ wantErr bool
+}
+
+func Test_newMetaData(t *testing.T) {
+ const testDir = "testMeta"
+ const preCapacity = 500
+ params := []tests{
+ buildMetaverionErrorTest(),
+ buildNormalTest(),
+ buildCapacitynErrorTest(),
+ }
+
+ for _, tt := range params {
+ t.Run(tt.name, func(t *testing.T) {
+ // clean
+ defer func() {
+ if err := os.RemoveAll(testDir); err != nil {
+ t.Errorf("remove Metadata dir error: %v", err)
+ }
+ }()
+
+ got, err := NewMetaData(testDir, preCapacity)
+ if err != nil {
+ t.Errorf("cannot create Metadata file: %v", err)
+ return
+ }
+
+ // write args
+ got.PutVersion(int64(tt.args.metaVersion))
+ got.PutWritingOffset(tt.args.writingSegID, tt.args.writingSegOffset)
+ got.PutReadingOffset(tt.args.readingSegID, tt.args.readingSegOffset)
+ got.PutCommittedOffset(tt.args.committedSegID, tt.args.committedSegOffset)
+ got.PutWatermarkOffset(tt.args.watermarkSegID, tt.args.watermarkSegOffset)
+ if got.Close() != nil {
+ t.Errorf("cannot Close the Metadata file: %v", err)
+ return
+ }
+
+ oldMeta, err := NewMetaData(testDir, tt.args.capacity)
+ if err != nil {
+ if tt.wantErr {
+ return
+ }
+ t.Errorf("cannot read old Metadata file: %v", err)
+ return
+ }
+
+ // read args
+ wmID, wmOffset := oldMeta.GetWatermarkOffset()
+ cID, cOffset := oldMeta.GetCommittedOffset()
+ rID, rOffset := oldMeta.GetReadingOffset()
+ wID, wOffset := oldMeta.GetWritingOffset()
+
+ readArgs := args{
+ metaVersion: oldMeta.GetVersion(),
+ capacity: oldMeta.GetCapacity(),
+ watermarkSegID: wmID,
+ writingSegID: wID,
+ readingSegID: rID,
+ committedSegID: cID,
+ watermarkSegOffset: wmOffset,
+ readingSegOffset: rOffset,
+ committedSegOffset: cOffset,
+ writingSegOffset: wOffset,
+ }
+ if !reflect.DeepEqual(readArgs, tt.want) {
+ t.Errorf("want meta info is [%+v]\n ,but got [%+v]", tt.want, readArgs)
+ }
+ })
+ }
+}
+
+func buildMetaverionErrorTest() tests {
+ return tests{
+ name: "wrong version",
+ args: args{
+ metaVersion: 2,
+ capacity: 500,
+ watermarkSegID: 1,
+ writingSegID: 2,
+ readingSegID: 3,
+ committedSegID: 4,
+ watermarkSegOffset: 10,
+ readingSegOffset: 20,
+ committedSegOffset: 30,
+ writingSegOffset: 40,
+ },
+ want: args{
+ metaVersion: 2,
+ capacity: 500,
+ watermarkSegID: 1,
+ writingSegID: 2,
+ readingSegID: 3,
+ committedSegID: 4,
+ watermarkSegOffset: 10,
+ readingSegOffset: 20,
+ committedSegOffset: 30,
+ writingSegOffset: 40,
+ },
+ wantErr: true,
+ }
+}
+
+func buildCapacitynErrorTest() tests {
+ return tests{
+ name: "wrong version",
+ args: args{
+ metaVersion: 1,
+ capacity: 600,
+ watermarkSegID: 1,
+ writingSegID: 2,
+ readingSegID: 3,
+ committedSegID: 4,
+ watermarkSegOffset: 10,
+ readingSegOffset: 20,
+ committedSegOffset: 30,
+ writingSegOffset: 40,
+ },
+ want: args{
+ metaVersion: 1,
+ capacity: 600,
+ watermarkSegID: 1,
+ writingSegID: 2,
+ readingSegID: 3,
+ committedSegID: 4,
+ watermarkSegOffset: 10,
+ readingSegOffset: 20,
+ committedSegOffset: 30,
+ writingSegOffset: 40,
+ },
+ wantErr: true,
+ }
+}
+
+func buildNormalTest() tests {
+ return tests{
+ name: "correct version",
+ args: args{
+ metaVersion: 1,
+ capacity: 500,
+ watermarkSegID: 2,
+ writingSegID: 3,
+ readingSegID: 4,
+ committedSegID: 5,
+ watermarkSegOffset: 6,
+ readingSegOffset: 7,
+ committedSegOffset: 8,
+ writingSegOffset: 9,
+ },
+ want: args{
+ metaVersion: 1,
+ capacity: 500,
+ watermarkSegID: 2,
+ writingSegID: 3,
+ readingSegID: 4,
+ committedSegID: 5,
+ watermarkSegOffset: 6,
+ readingSegOffset: 7,
+ committedSegOffset: 8,
+ writingSegOffset: 9,
+ },
+ wantErr: true,
+ }
+}
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
new file mode 100644
index 0000000..e699420
--- /dev/null
+++ b/plugins/queue/mmap/queue.go
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "os"
+ "sync"
+
+ "github.com/grandecola/mmap"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/plugins/queue/api"
+ "github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+ sync.Mutex
+ // config
+ SegmentSize int `mapstructure:"segment_size"` // The size of each segment. The unit is byte.
+ MaxInMemSegments int `mapstructure:"max_in_mem_segments"` // The max num of segments in memory.
+ QueueCapacitySegments int `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+ FlushPeriod int `mapstructure:"flush_period"` // The period flush time. The unit is ms.
+ FlushCeilingNum int `mapstructure:"flush_ceiling_num"` // The max number in one flush time.
+ MaxEventSize int `mapstructure:"max_event_size"` // The max size of the input event.
+ QueueDir string `mapstructure:"queue_dir"` // Contains all files in the queue.
+
+ // running components
+ meta *meta.Metadata // The metadata file.
+ segments []*mmap.File // The data files.
+ mmapCount int // The number of the memory mapped files.
+ unflushedNum int // The unflushed number.
+ flushChannel chan struct{} // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
+ insufficientMemChannel chan struct{} // Notify when memory is insufficient
+ sufficientMemChannel chan struct{} // Notify when memory is sufficient
+
+ // control components
+ ctx context.Context // Parent ctx
+ cancel context.CancelFunc // Parent ctx cancel function
+ showDownWg sync.WaitGroup // The shutdown wait group.
+
+ bufPool *sync.Pool
+
+ encoder *Encoder
+ decoder *Decoder
+}
+
+func (q *Queue) Name() string {
+ return "mmap-queue"
+}
+
+func (q *Queue) Description() string {
+ return "this is a mmap queue"
+}
+
+func (q *Queue) DefaultConfig() string {
+ return `
+# The size of each segment. Default value is 128K. The unit is Byte.
+segment_size: 131072
+# The max num of segments in memory. Default value is 10.
+max_in_mem_segments: 10
+# The capacity of Queue = segment_size * queue_capacity_segments.
+queue_capacity_segments: 4000
+# The period flush time. The unit is ms. Default value is 1 second.
+flush_period: 1000
+# The max number in one flush time. Default value is 10000.
+flush_ceiling_num: 10000
+# Contains all files in the queue.
+queue_dir: satellite-mmap-queue
+# The max size of the input event. Default value is 20k.
+max_event_size: 20480
+`
+}
+
+func (q *Queue) Initialize() error {
+ q.encoder = NewEncoder()
+ q.decoder = NewDecoder()
+
+ q.bufPool = &sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }}
+ // the size of each segment file should be a multiple of the page size.
+ pageSize := os.Getpagesize()
+ if q.SegmentSize%pageSize != 0 {
+ q.SegmentSize -= q.SegmentSize % pageSize
+ }
+ if q.SegmentSize/pageSize == 0 {
+ q.SegmentSize = 131072
+ }
+ // the minimum MaxInMemSegments value should be 4.
+ if q.MaxInMemSegments < 4 {
+ q.MaxInMemSegments = 4
+ }
+ // load metadata and override the reading or writing offset by the committed or watermark offset.
+ md, err := meta.NewMetaData(q.QueueDir, q.QueueCapacitySegments)
+ if err != nil {
+ return fmt.Errorf("error in creating the metadata: %v", err)
+ }
+ q.meta = md
+ cmID, cmOffset := md.GetCommittedOffset()
+ wmID, wmOffset := md.GetWatermarkOffset()
+ md.PutWritingOffset(wmID, wmOffset)
+ md.PutReadingOffset(cmID, cmOffset)
+ // keep the reading or writing segments in the memory.
+ q.segments = make([]*mmap.File, q.QueueCapacitySegments)
+ if _, err := q.GetSegment(cmID); err != nil {
+ return err
+ }
+ if _, err := q.GetSegment(wmID); err != nil {
+ return err
+ }
+ // init components
+ q.insufficientMemChannel = make(chan struct{})
+ q.sufficientMemChannel = make(chan struct{})
+ q.flushChannel = make(chan struct{})
+ q.ctx, q.cancel = context.WithCancel(context.Background())
+ // async supported processes.
+ q.showDownWg.Add(2)
+ go q.segmentSwapper()
+ go q.flush()
+ return nil
+}
+
+func (q *Queue) Push(e *event.Event) error {
+ data, err := q.encoder.serialize(e)
+ if err != nil {
+ return err
+ }
+ if len(data) > q.MaxEventSize {
+ return fmt.Errorf("cannot push the event to the queue because the size %dB is over ceiling", len(data))
+ }
+ return q.push(data)
+}
+
+func (q *Queue) Pop() (*api.SequenceEvent, error) {
+ data, id, offset, err := q.pop()
+ if err != nil {
+ return nil, err
+ }
+ e, err := q.decoder.deserialize(data)
+ if err != nil {
+ return nil, err
+ }
+ return &api.SequenceEvent{
+ Event: e,
+ Offset: q.encodeOffset(id, offset),
+ }, nil
+}
+
+func (q *Queue) Close() error {
+ q.cancel()
+ q.showDownWg.Wait()
+ for i, segment := range q.segments {
+ if segment != nil {
+ err := segment.Unmap()
+ if err != nil {
+ log.Logger.Errorf("cannot unmap the segments: %d, %v", i, err)
+ }
+ }
+ }
+ if err := q.meta.Close(); err != nil {
+ log.Logger.Errorf("cannot unmap the metadata: %v", err)
+ }
+ return nil
+}
+
+func (q *Queue) Ack(lastOffset event.Offset) {
+ id, offset, err := q.decodeOffset(lastOffset)
+ if err != nil {
+ log.Logger.Errorf("cannot ack queue with the offset:%s", lastOffset)
+ }
+ q.meta.PutCommittedOffset(id, offset)
+}
diff --git a/plugins/queue/mmap/queue_opreation.go b/plugins/queue/mmap/queue_opreation.go
new file mode 100644
index 0000000..b73321e
--- /dev/null
+++ b/plugins/queue/mmap/queue_opreation.go
@@ -0,0 +1,236 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+const uInt64Size = 8
+
+// flush control the flush operation by timer or counter.
+func (q *Queue) flush() {
+ defer q.showDownWg.Done()
+ ctx, cancel := context.WithCancel(q.ctx)
+ defer cancel()
+ for {
+ timeTicker := time.NewTicker(time.Duration(q.FlushPeriod) * time.Millisecond)
+ select {
+ case <-q.flushChannel:
+ q.doFlush()
+ case <-timeTicker.C:
+ q.doFlush()
+ case <-ctx.Done():
+ q.doFlush()
+ return
+ }
+ }
+}
+
+// doFlush flush the segment and meta files to the disk.
+func (q *Queue) doFlush() {
+ q.Lock()
+ defer q.Unlock()
+ for _, segment := range q.segments {
+ if segment == nil {
+ continue
+ }
+ if err := segment.Flush(syscall.MS_SYNC); err != nil {
+ log.Logger.Errorf("cannot flush segment file: %v", err)
+ }
+ }
+ wid, woffset := q.meta.GetWritingOffset()
+ q.meta.PutWatermarkOffset(wid, woffset)
+ if err := q.meta.Flush(); err != nil {
+ log.Logger.Errorf("cannot flush meta file: %v", err)
+ }
+}
+
+// push writes the data into the file system. It first writes the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) push(bytes []byte) error {
+ if q.isFull() {
+ return fmt.Errorf("cannot push data when the queue is full")
+ }
+ id, offset := q.meta.GetWritingOffset()
+ id, offset, err := q.writeLength(len(bytes), id, offset)
+ if err != nil {
+ return err
+ }
+ id, offset, err = q.writeBytes(bytes, id, offset)
+ if err != nil {
+ return err
+ }
+ q.meta.PutWritingOffset(id, offset)
+ q.unflushedNum++
+ if q.unflushedNum == q.FlushCeilingNum {
+ q.flushChannel <- struct{}{}
+ q.unflushedNum = 0
+ }
+ return nil
+}
+
+// pop reads the data from the file system. It first reads the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) pop() (data []byte, rid, roffset int64, err error) {
+ if q.isEmpty() {
+ return nil, 0, 0, fmt.Errorf("cannot read data when the queue is empty")
+ }
+ id, offset := q.meta.GetReadingOffset()
+ id, offset, length, err := q.readLength(id, offset)
+ if err != nil {
+ return nil, 0, 0, err
+ }
+ bytes, id, offset, err := q.readBytes(id, offset, length)
+ if err != nil {
+ return nil, 0, 0, err
+ }
+ q.meta.PutReadingOffset(id, offset)
+ return bytes, id, offset, nil
+}
+
+// readBytes reads bytes into the memory mapped file.
+func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, newOffset int64, err error) {
+ counter := 0
+ res := make([]byte, length)
+ for {
+ segment, err := q.GetSegment(id)
+ if err != nil {
+ return nil, 0, 0, err
+ }
+ readBytes, err := segment.ReadAt(res[counter:], offset)
+ if err != nil {
+ return nil, 0, 0, err
+ }
+ counter += readBytes
+ offset += int64(readBytes)
+ if offset == int64(q.SegmentSize) {
+ id, offset = id+1, 0
+ }
+ if counter == length {
+ break
+ }
+ }
+ return res, id, offset, nil
+}
+
+// readLength reads the data length with 8 Bits spaces.
+func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int, err error) {
+ if offset+uInt64Size > int64(q.SegmentSize) {
+ id, offset = id+1, 0
+ }
+ segment, err := q.GetSegment(id)
+ if err != nil {
+ return 0, 0, 0, err
+ }
+ num := segment.ReadUint64At(offset)
+ offset += uInt64Size
+ if offset == int64(q.SegmentSize) {
+ id, offset = id+1, 0
+ }
+ return id, offset, int(num), nil
+}
+
+// writeLength write the data length with 8 Bits spaces.
+func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int64, err error) {
+ if offset+uInt64Size > int64(q.SegmentSize) {
+ id, offset = id+1, 0
+ }
+ segment, err := q.GetSegment(id)
+ if err != nil {
+ return 0, 0, err
+ }
+ segment.WriteUint64At(uint64(length), offset)
+ offset += uInt64Size
+ if offset == int64(q.SegmentSize) {
+ id, offset = id+1, 0
+ }
+ return id, offset, nil
+}
+
+// writeBytes writes bytes into the memory mapped file.
+func (q *Queue) writeBytes(bytes []byte, id, offset int64) (newID, newOffset int64, err error) {
+ counter := 0
+ length := len(bytes)
+
+ for {
+ segment, err := q.GetSegment(id)
+ if err != nil {
+ return 0, 0, err
+ }
+ writtenBytes, err := segment.WriteAt(bytes[counter:], offset)
+ if err != nil {
+ return 0, 0, err
+ }
+ counter += writtenBytes
+ offset += int64(writtenBytes)
+ if offset == int64(q.SegmentSize) {
+ id, offset = id+1, 0
+ }
+ if counter == length {
+ break
+ }
+ }
+ return id, offset, nil
+}
+
+// isEmpty returns the capacity status
+func (q *Queue) isEmpty() bool {
+ rid, roffset := q.meta.GetReadingOffset()
+ wid, woffset := q.meta.GetWritingOffset()
+ return rid == wid && roffset == woffset
+}
+
+// isEmpty returns the capacity status
+func (q *Queue) isFull() bool {
+ rid, _ := q.meta.GetReadingOffset()
+ wid, _ := q.meta.GetWritingOffset()
+ // ensure enough spaces to promise data stability.
+ maxWid := rid + int64(q.QueueCapacitySegments) - 1 - int64(q.MaxEventSize/q.SegmentSize)
+ return wid >= maxWid
+}
+
+// encode the meta to the offset
+func (q *Queue) encodeOffset(id, offset int64) event.Offset {
+ return event.Offset(strconv.FormatInt(id, 10) + "-" + strconv.FormatInt(offset, 10))
+}
+
+// decode the offset to the meta of the mmap queue.
+func (q *Queue) decodeOffset(val event.Offset) (id, offset int64, err error) {
+ arr := strings.Split(string(val), "-")
+ if len(arr) == 2 {
+ id, err := strconv.ParseInt(arr[0], 10, 64)
+ if err != nil {
+ return 0, 0, err
+ }
+ offset, err := strconv.ParseInt(arr[1], 10, 64)
+ if err != nil {
+ return 0, 0, err
+ }
+ return id, offset, nil
+ }
+ return 0, 0, fmt.Errorf("the input offset string is illegal: %s", val)
+}
diff --git a/plugins/queue/mmap/queue_test.go b/plugins/queue/mmap/queue_test.go
new file mode 100644
index 0000000..915400e
--- /dev/null
+++ b/plugins/queue/mmap/queue_test.go
@@ -0,0 +1,402 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+ "bytes"
+ "fmt"
+ "os"
+ "reflect"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+func initMmapQueue(cfg plugin.Config) (*Queue, error) {
+ log.Init(&log.LoggerConfig{})
+ plugin.RegisterPluginCategory(reflect.TypeOf((*api.Queue)(nil)).Elem())
+ plugin.RegisterPlugin(&Queue{})
+ var config plugin.Config = map[string]interface{}{
+ plugin.NameField: "mmap-queue",
+ }
+ for k, v := range cfg {
+ config[k] = v
+ }
+ q := api.GetQueue(config)
+ if q == nil {
+ return nil, fmt.Errorf("cannot get a default config mmap queue from the registry")
+ }
+ if q.Initialize() != nil {
+ return nil, fmt.Errorf("queue cannot initialize")
+ }
+ return q.(*Queue), nil
+}
+
+func cleanTestQueue(t *testing.T, q api.Queue) {
+ if err := os.RemoveAll(q.(*Queue).QueueDir); err != nil {
+ t.Errorf("cannot remove test queue dir, %v", err)
+ }
+}
+
+func getBatchEvents(count int) []*event.Event {
+ var slice []*event.Event
+ msg := make([]byte, 0)
+ for i := 0; i < 2000; i++ {
+ m := []byte("a")
+ msg = append(msg, m...)
+ }
+ for i := 0; i < count; i++ {
+ slice = append(slice, &event.Event{
+ Name: "event" + strconv.Itoa(i),
+ Timestamp: time.Now(),
+ Meta: map[string]string{
+ "meta": "mval" + strconv.Itoa(i),
+ },
+ Data: map[string]interface{}{
+ "data": "dval" + strconv.Itoa(i),
+ "bytes": msg,
+ "index": i,
+ },
+ Type: event.LogEvent,
+ Remote: true,
+ })
+ }
+ return slice
+}
+
+func getNKData(n int) []byte {
+ return bytes.Repeat([]byte("a"), 1024*n)
+}
+
+func getLargeEvent(n int) *event.Event {
+ return &event.Event{
+ Name: "largeEvent",
+ Timestamp: time.Now(),
+ Meta: map[string]string{
+ "meta": "largeEvent",
+ },
+ Data: map[string]interface{}{
+ "data": getNKData(n),
+ },
+ Type: event.LogEvent,
+ Remote: true,
+ }
+}
+
+func TestQueue_Normal(t *testing.T) {
+ q, err := initMmapQueue(plugin.Config{
+ "queue_dir": "TestQueue_Normal",
+ })
+ defer cleanTestQueue(t, q)
+ if err != nil {
+ t.Fatalf("error in initializing the mmap queue: %v", err)
+ }
+ events := getBatchEvents(10)
+ for _, e := range events {
+ if err = q.Push(e); err != nil {
+ t.Errorf("queue cannot push one event: %+v", err)
+ }
+ }
+ for i := 0; i < 10; i++ {
+ sequenceEvent, err := q.Pop()
+ if err != nil {
+ t.Errorf("error in fetching data from queue: %v", err)
+ } else if !cmp.Equal(events[i], sequenceEvent.Event) {
+ t.Errorf("history data and fetching data is not equal\n,history:%+v\n. pop data:%+v\n", events[i], sequenceEvent.Event)
+ }
+ }
+}
+
+func TestQueue_ReadHistory(t *testing.T) {
+ cfg := plugin.Config{
+ "queue_dir": "TestQueue_ReadHistory",
+ "segment_size": 10240,
+ }
+
+ q, err := initMmapQueue(cfg)
+ defer cleanTestQueue(t, q)
+ if err != nil {
+ t.Fatalf("error in initializing the mmap queue: %v", err)
+ }
+ // close the queue to create a history empty queue.
+ if err := q.Close(); err != nil {
+ t.Fatalf("error in closing queue, %v", err)
+ }
+
+ // test cases.
+ batchSize := 10
+ batchNum := 100
+ events := getBatchEvents(batchSize * batchNum)
+
+ // Insert batchNum pieces of data in batchNum times
+ for i := 0; i < batchSize; i++ {
+ // recreate the queue
+ q, err := initMmapQueue(cfg)
+ if err != nil {
+ t.Fatalf("error in initializing the mmap queue: %v", err)
+ }
+ for j := 0; j < batchNum; j++ {
+ index := i*batchSize + j
+ if err = q.Push(events[index]); err != nil {
+ t.Errorf("queue cannot push one event: %+v", err)
+ }
+ }
+ if err := q.Close(); err != nil {
+ t.Fatalf("error in closing queue, %v", err)
+ }
+ }
+
+ // Read batchNum pieces of data in batchNum times
+ for i := 0; i < batchSize; i++ {
+ // recreate the queue
+ q, err := initMmapQueue(cfg)
+ if err != nil {
+ t.Fatalf("error in initializing the mmap queue: %v", err)
+ }
+ for j := 0; j < batchNum; j++ {
+ index := i*batchSize + j
+ sequenceEvent, err := q.Pop()
+ if err != nil {
+ t.Errorf("error in fetching data from queue: %v", err)
+ } else if cmp.Equal(events[index], sequenceEvent.Event) {
+ q.Ack(sequenceEvent.Offset)
+ } else {
+ t.Errorf("history data and fetching data is not equal\n,history:%+v\n. pop data:%+v\n", events[index], sequenceEvent.Event)
+ }
+ }
+ if err := q.Close(); err != nil {
+ t.Fatalf("error in closing queue, %v", err)
+ }
+ }
+}
+
+func TestQueue_PushOverCeilingMsg(t *testing.T) {
+ cfg := plugin.Config{
+ "queue_dir": "TestQueue_PushOverCeilingMsg",
+ "segment_size": 10240,
+ "max_event_size": 1024 * 8,
+ }
+ largeEvent := getLargeEvent(20)
+ q, err := initMmapQueue(cfg)
+ if err != nil {
+ t.Fatalf("cannot get a mmap queue: %v", err)
+ }
+ defer cleanTestQueue(t, q)
+ err = q.Push(largeEvent)
+ if err == nil {
+ t.Fatalf("The insertion of the over ceiling event is not as expected")
+ } else {
+ fmt.Printf("want err: %v", err)
+ }
+}
+
+func TestQueue_FlushWhenReachNum(t *testing.T) {
+ cfg := plugin.Config{
+ "queue_dir": "TestQueue_FlushWhenReachNum",
+ "segment_size": 10240,
+ "flush_ceiling_num": 5,
+ "flush_period": 1000 * 60,
+ }
+ q, err := initMmapQueue(cfg)
+ if err != nil {
+ t.Fatalf("cannot get a mmap queue: %v", err)
+ }
+ defer cleanTestQueue(t, q)
+ events := getBatchEvents(5)
+
+ for _, e := range events {
+ err = q.Push(e)
+ if err != nil {
+ t.Errorf("queue cannot push one event: %+v", err)
+ }
+ }
+ time.Sleep(time.Second)
+ wID, wOffset := q.meta.GetWritingOffset()
+ wmID, wmOffset := q.meta.GetWatermarkOffset()
+ if wID != wmID || wOffset != wmOffset {
+ t.Fatalf("the flush operation was not invoking when reach the flush_ceiling_num.")
+ }
+}
+
+func TestQueue_FlushPeriod(t *testing.T) {
+ cfg := plugin.Config{
+ "queue_dir": "TestQueue_FlushPeriod",
+ "segment_size": 10240,
+ "flush_ceiling_num": 50,
+ "flush_period": 1000 * 1,
+ }
+ q, err := initMmapQueue(cfg)
+ if err != nil {
+ t.Fatalf("cannot get a mmap queue: %v", err)
+ }
+ defer cleanTestQueue(t, q)
+ events := getBatchEvents(5)
+
+ for _, e := range events {
+ err = q.Push(e)
+ if err != nil {
+ t.Errorf("queue cannot push one event: %+v", err)
+ }
+ }
+ time.Sleep(time.Second * 2)
+ wID, wOffset := q.meta.GetWritingOffset()
+ wmID, wmOffset := q.meta.GetWatermarkOffset()
+ if wID != wmID || wOffset != wmOffset {
+ t.Fatalf("the flush operation was not invoking when reach the flush_ceiling_num.")
+ }
+}
+
+func TestQueue_MemCost(t *testing.T) {
+ cfg := plugin.Config{
+ "queue_dir": "TestQueue_MemCost",
+ "segment_size": 1024 * 4,
+ "max_in_mem_segments": 8,
+ }
+ q, err := initMmapQueue(cfg)
+ if err != nil {
+ t.Fatalf("cannot get a mmap queue: %v", err)
+ }
+ defer cleanTestQueue(t, q)
+ events := getBatchEvents(20)
+ var memcost []int
+ for _, e := range events {
+ err = q.Push(e)
+ memcost = append(memcost, q.mmapCount)
+ if err != nil {
+ t.Errorf("queue cannot push one event: %+v", err)
+ }
+ }
+ want := []int{
+ 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 5, 6, 6, 7, 7, 8,
+ }
+ if !cmp.Equal(want, memcost) {
+ t.Fatalf("the memory cost trends are not in line with expectations,\n want: %v,\n but got: %v", want, memcost)
+ }
+}
+
+func TestQueue_OverSegmentEvent(t *testing.T) {
+ cfg := plugin.Config{
+ "queue_dir": "TestQueue_OverSegmentEvent",
+ "segment_size": 1024 * 4,
+ }
+ q, err := initMmapQueue(cfg)
+ if err != nil {
+ t.Fatalf("cannot get a mmap queue: %v", err)
+ }
+ defer cleanTestQueue(t, q)
+ size := 10
+ wantPos := size * 1024 / q.SegmentSize
+ largeEvent := getLargeEvent(size)
+ err = q.Push(largeEvent)
+ if err != nil {
+ t.Errorf("queue cannot push one event: %+v", err)
+ }
+ id, _ := q.meta.GetWritingOffset()
+ if int(id) != wantPos {
+ t.Fatalf("the writing offset id should at %d segment, but at %d", id, wantPos)
+ }
+}
+
+func TestQueue_ReusingFiles(t *testing.T) {
+ cfg := plugin.Config{
+ "queue_dir": "TestQueue_ReusingFiles",
+ "segment_size": 1024 * 4,
+ "queue_capacity_segments": 5,
+ "max_event_size": 1024 * 3,
+ }
+ q, err := initMmapQueue(cfg)
+ if err != nil {
+ t.Fatalf("cannot get a mmap queue: %v", err)
+ }
+ defer cleanTestQueue(t, q)
+
+ for i := 0; i < 100; i++ {
+ err = q.Push(getLargeEvent(2))
+ if err != nil {
+ t.Errorf("queue cannot push one event: %+v", err)
+ }
+ _, err := q.Pop()
+ if err != nil {
+ t.Errorf("error in fetching data from queue: %v", err)
+ }
+ }
+ rid, roffset := q.meta.GetReadingOffset()
+ wid, woffset := q.meta.GetWritingOffset()
+ fmt.Printf("rid:%d, roffset:%d, wid:%d, woffset:%d\n", rid, roffset, wid, woffset)
+ if int(wid) <= q.QueueCapacitySegments || int(rid) <= q.QueueCapacitySegments {
+ t.Errorf("cannot valid reusing files")
+ }
+}
+
+func TestQueue_Empty(t *testing.T) {
+ cfg := plugin.Config{
+ "queue_dir": "TestQueue_ReusingFiles",
+ "segment_size": 1024 * 4,
+ "queue_capacity_segments": 10,
+ }
+ q, err := initMmapQueue(cfg)
+ if err != nil {
+ t.Fatalf("cannot get a mmap queue: %v", err)
+ }
+ defer cleanTestQueue(t, q)
+ for _, e := range getBatchEvents(3) {
+ err = q.Push(e)
+ if err != nil {
+ t.Errorf("queue cannot push one event: %+v", err)
+ }
+ }
+ for i := 0; i < 3; i++ {
+ if _, err = q.Pop(); err != nil {
+ t.Errorf("error in fetching data from queue: %v", err)
+ }
+ }
+ _, err = q.Pop()
+ if err != nil && err.Error() != "cannot read data when the queue is empty" {
+ t.Fatalf("not except err: %v", err)
+ }
+}
+
+func TestQueue_Full(t *testing.T) {
+ cfg := plugin.Config{
+ "queue_dir": "TestQueue_ReusingFiles",
+ "segment_size": 1024 * 4,
+ "queue_capacity_segments": 10,
+ }
+ q, err := initMmapQueue(cfg)
+ if err != nil {
+ t.Fatalf("cannot get a mmap queue: %v", err)
+ }
+ defer cleanTestQueue(t, q)
+ for _, e := range getBatchEvents(8) {
+ err = q.Push(e)
+ if err != nil {
+ t.Errorf("queue cannot push one event: %+v", err)
+ }
+ }
+ err = q.Push(getLargeEvent(2))
+ if err != nil && err.Error() != "cannot push data when the queue is full" {
+ t.Fatalf("not except err: %v", err)
+ }
+}
diff --git a/plugins/queue/mmap/segment/segment.go b/plugins/queue/mmap/segment/segment.go
new file mode 100644
index 0000000..cf738d1
--- /dev/null
+++ b/plugins/queue/mmap/segment/segment.go
@@ -0,0 +1,68 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package segment
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "syscall"
+
+ "github.com/grandecola/mmap"
+)
+
+const FileSuffix = "_segment.dat"
+
+// NewSegment returns a pointer to a memory mapped file according to the given file name and file size.
+// The size of each segment file should be a multiple of the page size.
+func NewSegment(name string, size int) (*mmap.File, error) {
+ name, err := filepath.Abs(name)
+ if err != nil {
+ return nil, fmt.Errorf("error in getting the absolute path of the segment file : %v", err)
+ }
+ paths, _ := filepath.Split(name)
+ _, err = os.Stat(paths)
+ if err != nil && os.IsNotExist(err) && os.MkdirAll(paths, 0744) != nil {
+ return nil, fmt.Errorf("error in creating the parent dirs of the segment file : %v", err)
+ }
+
+ file, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0744)
+ if err != nil {
+ return nil, fmt.Errorf("error in opening segment file : %v", err)
+ }
+
+ stat, err := file.Stat()
+ if err != nil {
+ return nil, fmt.Errorf("error in reading info of the segment file : %v", err)
+ }
+
+ if stat.Size() != int64(size) {
+ if file.Truncate(int64(size)) != nil {
+ return nil, fmt.Errorf("error in truncating file: %v", err)
+ }
+ }
+
+ segment, err := mmap.NewSharedFileMmap(file, 0, size, syscall.PROT_READ|syscall.PROT_WRITE)
+ if err != nil {
+ return nil, fmt.Errorf("error in creating the mmap segment file : %v", err)
+ }
+ if err := file.Close(); err != nil {
+ return nil, fmt.Errorf("error in closing the segment file : %v", err)
+ }
+ return segment, nil
+}
diff --git a/plugins/queue/mmap/segment/segment_test.go b/plugins/queue/mmap/segment/segment_test.go
new file mode 100644
index 0000000..98c38e6
--- /dev/null
+++ b/plugins/queue/mmap/segment/segment_test.go
@@ -0,0 +1,187 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package segment
+
+import (
+ "os"
+ "testing"
+
+ "github.com/grandecola/mmap"
+)
+
+func Test_newSegmentWithOldFile(t *testing.T) {
+ type args struct {
+ fileName string
+ originalSize int
+ neededSize int
+ }
+ tests := []struct {
+ name string
+ args args
+ want int64
+ wantErr bool
+ }{
+ {
+ name: "less than the needed size",
+ args: args{
+ fileName: "temp.segment",
+ originalSize: os.Getpagesize(),
+ neededSize: os.Getpagesize() * 2,
+ },
+ want: int64(os.Getpagesize() * 2),
+ wantErr: false,
+ },
+ {
+ name: "equal to the needed size",
+ args: args{
+ fileName: "temp2.segment",
+ originalSize: os.Getpagesize() * 2,
+ neededSize: os.Getpagesize() * 2,
+ },
+ want: int64(os.Getpagesize() * 2),
+ wantErr: false,
+ },
+ {
+ name: "larger than the needed size",
+ args: args{
+ fileName: "temp3.segment",
+ originalSize: os.Getpagesize() * 3,
+ neededSize: os.Getpagesize() * 2,
+ },
+ want: int64(os.Getpagesize() * 2),
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if file, err := os.Create(tt.args.fileName); err != nil {
+ t.Errorf("cannot create the original file: %v", err)
+ return
+ } else if err := file.Truncate(int64(tt.args.originalSize)); err != nil {
+ t.Errorf("cannot set the original file size: %v", err)
+ }
+
+ got, err := NewSegment(tt.args.fileName, tt.args.neededSize)
+ defer clean(got, tt.args.fileName, t)
+
+ if (err != nil) != tt.wantErr {
+ t.Errorf("NewSegment() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if file, err := os.Open(tt.args.fileName); err != nil {
+ t.Errorf("cannot open the mmap file: %v", err)
+ } else if stat, err := file.Stat(); err != nil {
+ t.Errorf("cannot read mmap file info: %v", err)
+ } else if stat.Size() != tt.want {
+ t.Errorf("want file size is %d ,but got %d", tt.want, stat.Size())
+ }
+ })
+ }
+}
+
+func Test_newSegmentSize(t *testing.T) {
+ type args struct {
+ fileName string
+ size int
+ }
+ tests := []struct {
+ name string
+ args args
+ want int64
+ wantErr bool
+ }{
+ {
+ name: "equal to page size",
+ args: args{
+ fileName: "temp2.segment",
+ size: os.Getpagesize() * 2,
+ },
+ want: int64(os.Getpagesize() * 2),
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := NewSegment(tt.args.fileName, tt.args.size)
+ defer clean(got, tt.args.fileName, t)
+
+ if (err != nil) != tt.wantErr {
+ t.Errorf("NewSegment() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if file, err := os.Open(tt.args.fileName); err != nil {
+ t.Errorf("cannot open the mmap file: %v", err)
+ } else if stat, err := file.Stat(); err != nil {
+ t.Errorf("cannot read mmap file info: %v", err)
+ } else if stat.Size() != tt.want {
+ t.Errorf("want file size is %d ,but got %d", tt.want, stat.Size())
+ }
+ })
+ }
+}
+
+func Test_newSegmentMultiDir(t *testing.T) {
+ const testDir = "testQueue"
+ defer func() {
+ if err := os.RemoveAll(testDir); err != nil {
+ t.Errorf("cannot clean the testqueue dir: %v", err)
+ return
+ }
+ }()
+
+ type args struct {
+ fileName string
+ size int
+ }
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ }{
+ {
+ name: "test multi dir",
+ args: args{
+ fileName: testDir + "/temp.segment",
+ size: 10,
+ },
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := NewSegment(tt.args.fileName, tt.args.size)
+ defer clean(got, tt.args.fileName, t)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("NewSegment() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ })
+ }
+}
+
+func clean(file *mmap.File, fileName string, t *testing.T) {
+ if file == nil {
+ return
+ }
+ if err := file.Unmap(); err != nil {
+ t.Errorf("unmap segment file error: %v", err)
+ }
+ if err := os.Remove(fileName); err != nil {
+ t.Errorf("delete segment file error: %v", err)
+ }
+}
diff --git a/plugins/queue/mmap/segment_operation.go b/plugins/queue/mmap/segment_operation.go
new file mode 100644
index 0000000..9aa24d9
--- /dev/null
+++ b/plugins/queue/mmap/segment_operation.go
@@ -0,0 +1,142 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+ "context"
+ "fmt"
+ "path"
+ "strconv"
+
+ "github.com/grandecola/mmap"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
+ index := q.GetIndex(segmentID)
+ if q.mmapCount >= q.MaxInMemSegments {
+ q.insufficientMemChannel <- struct{}{}
+ <-q.sufficientMemChannel
+ }
+ if err := q.mapSegment(segmentID); err != nil {
+ return nil, err
+ }
+ if q.segments[index] != nil {
+ return q.segments[index], nil
+ }
+ return nil, fmt.Errorf("cannot get a memory mapped file at %d segment", segmentID)
+}
+
+// mapSegment load the segment file reference to the segments.
+func (q *Queue) mapSegment(segmentID int64) error {
+ index := q.GetIndex(segmentID)
+ if q.segments[index] != nil {
+ return nil
+ }
+ filePath := path.Join(q.QueueDir, strconv.Itoa(index)+segment.FileSuffix)
+ file, err := segment.NewSegment(filePath, q.SegmentSize)
+ if err != nil {
+ return err
+ }
+ q.mmapCount++
+ q.segments[index] = file
+ return nil
+}
+
+// unmapSegment cancel the memory mapped status.
+func (q *Queue) unmapSegment(segmentID int64) error {
+ index := q.GetIndex(segmentID)
+ if q.segments[index] == nil {
+ return nil
+ }
+ if err := q.segments[index].Unmap(); err != nil {
+ return fmt.Errorf("error in unmap segemnt: %v", err)
+ }
+ q.mmapCount--
+ q.segments[index] = nil
+ return nil
+}
+
+// segmentSwapper run with a go routine to ensure the memory cost.
+func (q *Queue) segmentSwapper() {
+ defer q.showDownWg.Done()
+ ctx, cancel := context.WithCancel(q.ctx)
+ defer cancel()
+ for {
+ select {
+ case <-q.insufficientMemChannel:
+ if q.mmapCount >= q.MaxInMemSegments {
+ if q.doSwap() != nil {
+ log.Logger.Errorf("cannot get enough memory to receive new data")
+ }
+ }
+ q.sufficientMemChannel <- struct{}{}
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+// doSwap swap the memory mapped files to normal files to promise the memory resources cost.
+func (q *Queue) doSwap() error {
+ rID, _ := q.meta.GetReadingOffset()
+ wID, _ := q.meta.GetWritingOffset()
+ logicRID := rID + int64(q.QueueCapacitySegments)
+ logicWID := wID + int64(q.QueueCapacitySegments)
+ wIndex := q.GetIndex(wID)
+ rIndex := q.GetIndex(rID)
+ for q.mmapCount >= q.MaxInMemSegments {
+ for i := logicRID - int64(q.MaxInMemSegments); i >= 0 && i < logicRID; i++ {
+ if q.GetIndex(i) == wIndex || q.GetIndex(i) == rIndex {
+ continue
+ }
+
+ if err := q.unmapSegment(i); err != nil {
+ return err
+ }
+ // the writing segment and the reading segment should still in memory.
+ // q.MaxInMemSegments/2-1 means keeping half available spaces to receive new data.
+ if q.MaxInMemSegments-q.mmapCount >= q.MaxInMemSegments/2-1 {
+ return nil
+ }
+ }
+ for i := logicWID - 1; i >= 0 && i >= logicWID-int64(q.MaxInMemSegments); i-- {
+ if q.GetIndex(i) == wIndex || q.GetIndex(i) == rIndex {
+ continue
+ }
+
+ if err := q.unmapSegment(i); err != nil {
+ return err
+ }
+ // the writing segment and the reading segment should still in memory.
+ // q.MaxInMemSegments/2-1 means keeping half available spaces to receive new data.
+ if q.MaxInMemSegments-q.mmapCount >= q.MaxInMemSegments/2-1 {
+ return nil
+ }
+ }
+ }
+ return nil
+}
+
+// GetIndex returns the index of the segments.
+func (q *Queue) GetIndex(segmentID int64) int {
+ return int(segmentID) % q.QueueCapacitySegments
+}
diff --git a/plugins/queue/mmap/serializer.go b/plugins/queue/mmap/serializer.go
new file mode 100644
index 0000000..38a5dd1
--- /dev/null
+++ b/plugins/queue/mmap/serializer.go
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+ "bytes"
+
+ "encoding/gob"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/event"
+)
+
+// Decoder used in pop operation for reusing gob.Decoder and buf.
+type Decoder struct {
+ buf *bytes.Buffer
+ decoder *gob.Decoder
+}
+
+// Encoder used in push operation for reusing gob.Decoder and buf.
+type Encoder struct {
+ buf *bytes.Buffer
+ encoder *gob.Encoder
+}
+
+func NewDecoder() *Decoder {
+ buf := new(bytes.Buffer)
+ return &Decoder{
+ buf: buf,
+ decoder: gob.NewDecoder(buf),
+ }
+}
+
+func NewEncoder() *Encoder {
+ buf := new(bytes.Buffer)
+ return &Encoder{
+ buf: buf,
+ encoder: gob.NewEncoder(buf),
+ }
+}
+
+func (d *Decoder) deserialize(b []byte) (*event.Event, error) {
+ defer d.buf.Reset()
+ d.buf.Write(b)
+ e := &event.Event{}
+ err := d.decoder.Decode(e)
+ if err != nil {
+ return nil, err
+ }
+ return e, nil
+}
+
+func (e *Encoder) serialize(data *event.Event) ([]byte, error) {
+ defer e.buf.Reset()
+ err := e.encoder.Encode(data)
+ if err != nil {
+ return nil, err
+ }
+ return e.buf.Bytes(), nil
+}
diff --git a/plugins/queue/api/queue_repository.go b/plugins/queue/queue_repository.go
similarity index 76%
rename from plugins/queue/api/queue_repository.go
rename to plugins/queue/queue_repository.go
index 11031d0..1c49822 100644
--- a/plugins/queue/api/queue_repository.go
+++ b/plugins/queue/queue_repository.go
@@ -15,26 +15,24 @@
// specific language governing permissions and limitations
// under the License.
-package api
+package queue
import (
"reflect"
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ "github.com/apache/skywalking-satellite/plugins/queue/api"
+ "github.com/apache/skywalking-satellite/plugins/queue/mmap"
)
-// GetQueue an initialized filter plugin.
-func GetQueue(config plugin.Config) Queue {
- return plugin.Get(reflect.TypeOf((*Queue)(nil)).Elem(), config).(Queue)
-}
-
// RegisterQueuePlugins register the used queue plugins.
func RegisterQueuePlugins() {
- plugin.RegisterPluginCategory(reflect.TypeOf((*Queue)(nil)).Elem())
- queues := []Queue{
+ plugin.RegisterPluginCategory(reflect.TypeOf((*api.Queue)(nil)).Elem())
+ queues := []api.Queue{
+ &mmap.Queue{},
// Please register the queue plugins at here.
}
- for _, queue := range queues {
- plugin.RegisterPlugin(queue)
+ for _, q := range queues {
+ plugin.RegisterPlugin(q)
}
}
diff --git a/plugins/receiver/api/receiver.go b/plugins/receiver/api/receiver.go
index 9e47692..ab04d9c 100644
--- a/plugins/receiver/api/receiver.go
+++ b/plugins/receiver/api/receiver.go
@@ -31,5 +31,5 @@ type Receiver interface {
RegisterHandler(server api.Server)
// Channel would be put a data when the receiver receives an APM data.
- Channel() <-chan event.SerializableEvent
+ Channel() <-chan *event.Event
}