You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2020/12/16 07:54:26 UTC

[skywalking-satellite] 01/01: mmap-queue-plugin

This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch mmap-queue
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit f2dfcfe1cf6a4881bf5a9d3537cb3661fbe5ee6b
Author: Evan <ev...@outlook.com>
AuthorDate: Wed Dec 16 15:53:59 2020 +0800

    mmap-queue-plugin
---
 Makefile                                           |   5 +-
 dist/LICENSE                                       |   3 +-
 dist/licenses/LICENSE-go-cmp                       |  27 ++
 dist/licenses/LICENSE-mmap                         |  21 ++
 docs/configuration/queue.md                        |  11 +
 docs/plugins/queue/mmap/README.md                  |  66 ++++
 go.mod                                             |   3 +-
 go.sum                                             |  18 +-
 internal/pkg/event/event.go                        |  53 +--
 internal/satellite/event/event.go                  | 109 ------
 .../satellite/module/gatherer/fetcher_gatherer.go  |  26 +-
 .../satellite/module/gatherer/receiver_gatherer.go |  25 +-
 internal/satellite/module/processor/processor.go   |   2 +-
 internal/satellite/module/sender/sender.go         |   4 +-
 plugins/fetcher/api/fetcher.go                     |   2 +-
 plugins/init.go                                    |   2 +-
 plugins/parser/api/parser.go                       |   4 +-
 plugins/queue/api/queue.go                         |  17 +-
 plugins/queue/mmap/README.md                       |   1 -
 plugins/queue/mmap/branchmark_test.go              | 109 ++++++
 plugins/queue/mmap/meta/meta.go                    | 153 ++++++++
 plugins/queue/mmap/meta/meta_test.go               | 208 +++++++++++
 plugins/queue/mmap/queue.go                        | 192 ++++++++++
 plugins/queue/mmap/queue_opreation.go              | 236 ++++++++++++
 plugins/queue/mmap/queue_test.go                   | 402 +++++++++++++++++++++
 plugins/queue/mmap/segment/segment.go              |  68 ++++
 plugins/queue/mmap/segment/segment_test.go         | 187 ++++++++++
 plugins/queue/mmap/segment_operation.go            | 142 ++++++++
 plugins/queue/mmap/serializer.go                   |  74 ++++
 plugins/queue/{api => }/queue_repository.go        |  18 +-
 plugins/receiver/api/receiver.go                   |   2 +-
 31 files changed, 2004 insertions(+), 186 deletions(-)

diff --git a/Makefile b/Makefile
index dbbc0e8..9ff7285 100644
--- a/Makefile
+++ b/Makefile
@@ -25,6 +25,7 @@ RELEASE_SRC = skywalking-satellite-$(VERSION)-src
 OS = $(shell uname)
 
 GO = go
+GIT = git
 GO_PATH = $$($(GO) env GOPATH)
 GO_BUILD = $(GO) build
 GO_GET = $(GO) get
@@ -36,7 +37,7 @@ GO_BUILD_FLAGS = -v
 GO_BUILD_LDFLAGS = -X main.version=$(VERSION)
 GQL_GEN = $(GO_PATH)/bin/gqlgen
 
-PLATFORMS := windows linux darwin
+PLATFORMS := linux darwin
 os = $(word 1, $@)
 ARCH = amd64
 
@@ -72,7 +73,7 @@ clean: tools
 	-rm -rf coverage.txt
 
 .PHONY: build
-build: deps windows linux darwin
+build: deps linux darwin
 
 
 .PHONY: $(PLATFORMS)
diff --git a/dist/LICENSE b/dist/LICENSE
index 618fe55..38f0d7a 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -222,7 +222,7 @@ BSD licenses
 
 The following components are provided under a BSD license. See project link for details.
 The text of each license is also included at licenses/LICENSE-[project].txt.
-
+    google (go-cmp) v0.5.4 https://github.com/google/go-cmp BSD
 
 ========================================================================
 MIT licenses
@@ -234,3 +234,4 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
 	sirupsen (logrus) v1.7.0: https://github.com/sirupsen/logrus MIT
 	spf13 (viper) v1.7.1: https://github.com/spf13/viper MIT
 	urfave (cli) v2.3.0: https://github.com/urfave/cli MIT
+	grandecola (mmap) v0.6.0: https://github.com/grandecola/mmap MIT
diff --git a/dist/licenses/LICENSE-go-cmp b/dist/licenses/LICENSE-go-cmp
new file mode 100644
index 0000000..32017f8
--- /dev/null
+++ b/dist/licenses/LICENSE-go-cmp
@@ -0,0 +1,27 @@
+Copyright (c) 2017 The Go Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+   * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+   * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+   * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/dist/licenses/LICENSE-mmap b/dist/licenses/LICENSE-mmap
new file mode 100644
index 0000000..be28ae0
--- /dev/null
+++ b/dist/licenses/LICENSE-mmap
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Aman Mangal
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/docs/configuration/queue.md b/docs/configuration/queue.md
new file mode 100644
index 0000000..b7d81b1
--- /dev/null
+++ b/docs/configuration/queue.md
@@ -0,0 +1,11 @@
+# queue configuration
+
+|  Type   | Param  | DefaultValue| Meaning| 
+|  ----  | ----  |----  | ----  |
+| mmap-queue  | segment_size | 131072 | The size of each segment. The unit is Byte.
+| mmap-queue  | max_in_mem_segments | 10 | The max num of segments in memory.
+| mmap-queue  | queue_capacity_segments | 4000 | The capacity of Queue = segment_size * queue_capacity_segments.
+| mmap-queue  | flush_period | 1000 | The period flush time. The unit is ms.
+| mmap-queue  | flush_ceiling_num | 10000 | The max number in one flush time.
+| mmap-queue  | queue_dir | satellite-mmap-queue |Contains all files in the queue.
+| mmap-queue  | max_event_size | 20480 |The max size of the input event. The unit is Byte.
diff --git a/docs/plugins/queue/mmap/README.md b/docs/plugins/queue/mmap/README.md
new file mode 100644
index 0000000..9c8379c
--- /dev/null
+++ b/docs/plugins/queue/mmap/README.md
@@ -0,0 +1,66 @@
+# Design
+The mmap-queue is a big, fast and persistent queue based on the memory mapped files. One mmap-queue has a directory to store the whole data. The Queue directory is made up with many segments and 1 meta file. 
+
+- Segment: Segment is the real data store center, that provides large-space storage and does not reduce read and write performance as much as possible by using mmap. And we will avoid deleting files by reusing them.
+- Meta: The purpose of meta is to find the data that the consumer needs.
+
+## Meta
+Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment, it takes at least one memory page size, which is generally 4K.
+```
+[    8Bit   ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit  ]
+[metaVersion][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][capacity]
+[metaVersion][writing   offset][watermark offset][committed offset][reading   offset][capacity]
+
+```
+### Transforming
+
+![](https://skywalking.apache.org/blog/2020-11-25-skywalking-satellite-0.1.0-design/offset-convert.jpg)
+
+## Configuration
+[Configuration Params](../../../configuration/queue.md)
+
+## Segment
+Segments are a series of files of the same size. Each input data would cost `8Bit+Len(data)Bit` to store the raw bytes. The first 8Bit is equal to `Len(data)` for finding the ending position. 
+### Swapper
+For the performance and resources thinking, we define a page replacement policy.
+
+- Keep the reading and writing segments on the memory.
+- When the mmapcount is greater or equals to the max_in_mem_segments, we first scan the read scope and then scan the written scope to swap the segments to promise the reading or writing segments are always in memory.
+    - Read scope: [reading_offset - max_in_mem_segments,reading_offset - 1]
+    - Written scope: [writing_offset - max_in_mem_segments,writing_offset - 1]
+    - Each displacement operation guarantees at least `max_in_mem_segments/2-1` capacity available. Subtract operation to subtract the amount of memory that must always exist.
+
+## BenchmarkTest
+Base machine: macbook pro 2018 Intel Core i7 16 GB 2400 MHz DDR4 SSD
+
+### push operation
+
+```
+BenchmarkPush
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:8KB
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:8KB-12         	   42470	     25098 ns/op	     466 B/op	      10 allocs/op
+
+BenchmarkPush/segmentSize:_256KB_maxInMemSegments:10_message:8KB
+BenchmarkPush/segmentSize:_256KB_maxInMemSegments:10_message:8KB-12         	   76905	     18910 ns/op	     418 B/op	      10 allocs/op
+
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:20_message:8KB
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:20_message:8KB-12         	   58221	     22258 ns/op	     465 B/op	      10 allocs/op
+
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:16KB
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:16KB-12        	   34053	     48635 ns/op	     566 B/op	      11 allocs/op
+```
+### push and pop operation
+```
+BenchmarkPushAndPop
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:8KB
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:8KB-12         	   22273	     45872 ns/op	   19512 B/op	      40 allocs/op
+
+BenchmarkPushAndPop/segmentSize:_256KB_maxInMemSegments:10_message:8KB
+BenchmarkPushAndPop/segmentSize:_256KB_maxInMemSegments:10_message:8KB-12         	   38874	     37169 ns/op	   19456 B/op	      40 allocs/op
+
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:20_message:8KB
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:20_message:8KB-12         	   38048	     36274 ns/op	   19514 B/op	      40 allocs/op
+
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:16KB
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:16KB-12        	   19768	     63399 ns/op	   36893 B/op	      41 allocs/op
+```
diff --git a/go.mod b/go.mod
index 3bf717f..1c012a9 100644
--- a/go.mod
+++ b/go.mod
@@ -3,8 +3,9 @@ module github.com/apache/skywalking-satellite
 go 1.14
 
 require (
+	github.com/google/go-cmp v0.5.4
+	github.com/grandecola/mmap v0.6.0
 	github.com/sirupsen/logrus v1.7.0
 	github.com/spf13/viper v1.7.1
 	github.com/urfave/cli/v2 v2.3.0
 )
-
diff --git a/go.sum b/go.sum
index e84cc08..23f05df 100644
--- a/go.sum
+++ b/go.sum
@@ -11,6 +11,7 @@ cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqCl
 cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
 cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
+github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
@@ -59,15 +60,22 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
 github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
+github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
 github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
 github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
 github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
 github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
 github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
 github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/grandecola/mmap v0.6.0 h1:dYrZWLay1rDlmlsGsSIoXGQ+JMu/t2ZnKt8vT1h+1o0=
+github.com/grandecola/mmap v0.6.0/go.mod h1:q5v9jpm393rcp5PXE6biArHKc2SWJBpXjfxSRtQMtNU=
 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
 github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
@@ -95,14 +103,17 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J
 github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
 github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
 github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
 github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
@@ -150,7 +161,9 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
 github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
 github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
@@ -168,11 +181,11 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
 github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
-github.com/urfave/cli v1.22.5 h1:lNq9sAHXK2qfdI8W+GRItjCEkI+2oR4d+MEHy1CKXoU=
 github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
 github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
@@ -267,6 +280,8 @@ golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtn
 golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8=
 golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
 google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
 google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
@@ -290,6 +305,7 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq
 google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
 gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
diff --git a/internal/pkg/event/event.go b/internal/pkg/event/event.go
index a834fae..2ed10cf 100644
--- a/internal/pkg/event/event.go
+++ b/internal/pkg/event/event.go
@@ -39,56 +39,31 @@ type Type int32
 // Offset is a generic form, which allows having different definitions in different Queues.
 type Offset string
 
-// Event that implement this interface would be allowed to transmit in the Satellite.
-type Event interface {
-	// Name returns the event name.
-	Name() string
-
-	// Meta is a pair of key and value to record meta data, such as labels.
-	Meta() map[string]string
-
-	// Data returns the wrapped data.
-	Data() interface{}
-
-	// Time returns the event time.
-	Time() time.Time
-
-	// Type is to distinguish different events.
-	Type() Type
-
-	// IsRemote means is a output event when returns true.
-	IsRemote() bool
-}
-
-// SerializableEvent is used in Collector to bridge Queue.
-type SerializableEvent interface {
-	Event
-
-	// ToBytes serialize the event to a byte array.
-	ToBytes() []byte
-
-	// FromBytes deserialize the byte array to an event.
-	FromBytes(bytes []byte) SerializableEvent
+type Event struct {
+	Name      string
+	Timestamp time.Time
+	Meta      map[string]string
+	Type      Type
+	Remote    bool
+	Data      map[string]interface{}
 }
 
 // BatchEvents is used by Forwarder to forward.
-type BatchEvents []Event
+type BatchEvents []*Event
 
 // OutputEventContext is a container to store the output context.
 type OutputEventContext struct {
-	Context map[string]Event
+	Context map[string]*Event
 	Offset  Offset
 }
 
-// Put puts the incoming event into the context when the event is a remote event.
-func (c *OutputEventContext) Put(event Event) {
-	if event.IsRemote() {
-		c.Context[event.Name()] = event
-	}
+// Put puts the incoming event into the context.
+func (c *OutputEventContext) Put(event *Event) {
+	c.Context[event.Name] = event
 }
 
-// Get returns a event in the context. When the eventName does not exist, a error would be returned.
-func (c *OutputEventContext) Get(eventName string) (Event, error) {
+// Get returns an event in the context. When the eventName does not exist, an error would be returned.
+func (c *OutputEventContext) Get(eventName string) (*Event, error) {
 	e, ok := c.Context[eventName]
 	if !ok {
 		err := fmt.Errorf("cannot find the event name in OutputEventContext : %s", eventName)
diff --git a/internal/satellite/event/event.go b/internal/satellite/event/event.go
deleted file mode 100644
index a53720b..0000000
--- a/internal/satellite/event/event.go
+++ /dev/null
@@ -1,109 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package event
-
-import (
-	"time"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/event"
-)
-
-// Event defines the common fields.
-type Event struct {
-	name      string
-	timestamp time.Time
-	meta      map[string]string
-	eventType event.Type
-	remote    bool
-}
-
-// ETBFunc serialize event to bytes.
-type ETBFunc func(event event.SerializableEvent) []byte
-
-// BToFunc deserialize bytes to event.
-type BToFunc func(bytes []byte) event.SerializableEvent
-
-// StructuredEvent works when the data is a struct type.
-type StructuredEvent struct {
-	Event
-	data interface{}
-}
-
-// StructuredEvent works when the data is not a struct type.
-type UnstructuredEvent struct {
-	Event
-	data map[string]interface{}
-}
-
-// StructuredEvent works when the data is a struct type in the collector.
-type StructuredSerializableEvent struct {
-	StructuredEvent
-	etb ETBFunc
-	bte BToFunc
-}
-
-// StructuredEvent works when the data is not a struct type in the collector.
-type UnstructuredSerializableEvent struct {
-	UnstructuredEvent
-	etb ETBFunc
-	bte BToFunc
-}
-
-func (s *Event) Name() string {
-	return s.name
-}
-
-func (s *Event) Meta() map[string]string {
-	return s.meta
-}
-
-func (s *Event) Time() time.Time {
-	return s.timestamp
-}
-
-func (s *Event) Type() event.Type {
-	return s.eventType
-}
-
-func (s *Event) IsRemote() bool {
-	return s.remote
-}
-
-func (u *StructuredEvent) Data() interface{} {
-	return u.data
-}
-
-func (u *UnstructuredEvent) Data() interface{} {
-	return u.data
-}
-
-func (s *StructuredSerializableEvent) ToBytes() []byte {
-	return s.etb(s)
-}
-
-func (s *StructuredSerializableEvent) FromBytes(bytes []byte) event.SerializableEvent {
-	return s.bte(bytes)
-}
-
-func (u *UnstructuredSerializableEvent) ToBytes() []byte {
-	return u.etb(u)
-}
-
-func (u *UnstructuredSerializableEvent) FromBytes(bytes []byte) event.SerializableEvent {
-	return u.bte(bytes)
-}
diff --git a/internal/satellite/module/gatherer/fetcher_gatherer.go b/internal/satellite/module/gatherer/fetcher_gatherer.go
index 77ac6d0..f218657 100644
--- a/internal/satellite/module/gatherer/fetcher_gatherer.go
+++ b/internal/satellite/module/gatherer/fetcher_gatherer.go
@@ -47,7 +47,7 @@ func (f *FetcherGatherer) Prepare() error {
 
 func (f *FetcherGatherer) Boot(ctx context.Context) {
 	var wg sync.WaitGroup
-	wg.Add(1)
+	wg.Add(2)
 	go func() {
 		defer wg.Done()
 		timeTicker := time.NewTicker(time.Duration(f.config.FetchInterval) * time.Millisecond)
@@ -55,21 +55,37 @@ func (f *FetcherGatherer) Boot(ctx context.Context) {
 			select {
 			case <-timeTicker.C:
 				events := f.runningFetcher.Fetch()
-				for _, event := range events {
-					err := f.runningQueue.Push(event)
+				for _, e := range events {
+					err := f.runningQueue.Push(e)
 					if err != nil {
 						// todo add abandonedCount metrics
 						log.Logger.Errorf("cannot put event into queue in %s namespace, %v", f.config.NamespaceName, err)
 					}
 				}
-			case e := <-f.runningQueue.Pop():
-				f.outputChannel <- e
 			case <-ctx.Done():
 				f.Shutdown()
 				return
 			}
 		}
 	}()
+
+	go func() {
+		defer wg.Done()
+		for {
+			select {
+			case <-ctx.Done():
+				f.Shutdown()
+				return
+			default:
+				if e, err := f.runningQueue.Pop(); err == nil {
+					f.outputChannel <- e
+				} else {
+					log.Logger.Errorf("error in popping from the queue: %v", err)
+					time.Sleep(time.Second)
+				}
+			}
+		}
+	}()
 	wg.Wait()
 }
 
diff --git a/internal/satellite/module/gatherer/receiver_gatherer.go b/internal/satellite/module/gatherer/receiver_gatherer.go
index 5735474..de812a5 100644
--- a/internal/satellite/module/gatherer/receiver_gatherer.go
+++ b/internal/satellite/module/gatherer/receiver_gatherer.go
@@ -20,6 +20,7 @@ package gatherer
 import (
 	"context"
 	"sync"
+	"time"
 
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
@@ -45,7 +46,7 @@ type ReceiverGatherer struct {
 func (r *ReceiverGatherer) Prepare() error {
 	log.Logger.Infof("receiver gatherer module of %s namespace is preparing", r.config.NamespaceName)
 	r.runningReceiver.RegisterHandler(r.runningServer)
-	if err := r.runningQueue.Prepare(); err != nil {
+	if err := r.runningQueue.Initialize(); err != nil {
 		log.Logger.Infof("the %s queue of %s namespace was failed to initialize", r.runningQueue.Name(), r.config.NamespaceName)
 		return err
 	}
@@ -54,7 +55,7 @@ func (r *ReceiverGatherer) Prepare() error {
 
 func (r *ReceiverGatherer) Boot(ctx context.Context) {
 	var wg sync.WaitGroup
-	wg.Add(1)
+	wg.Add(2)
 	go func() {
 		defer wg.Done()
 		for {
@@ -65,14 +66,30 @@ func (r *ReceiverGatherer) Boot(ctx context.Context) {
 					// todo add abandonedCount metrics
 					log.Logger.Errorf("cannot put event into queue in %s namespace, error is: %v", r.config.NamespaceName, err)
 				}
-			case e := <-r.runningQueue.Pop():
-				r.outputChannel <- e
 			case <-ctx.Done():
 				r.Shutdown()
 				return
 			}
 		}
 	}()
+
+	go func() {
+		defer wg.Done()
+		for {
+			select {
+			case <-ctx.Done():
+				r.Shutdown()
+				return
+			default:
+				if e, err := r.runningQueue.Pop(); err == nil {
+					r.outputChannel <- e
+				} else {
+					log.Logger.Errorf("error in popping from the queue: %v", err)
+					time.Sleep(time.Second)
+				}
+			}
+		}
+	}()
 	wg.Wait()
 }
 
diff --git a/internal/satellite/module/processor/processor.go b/internal/satellite/module/processor/processor.go
index 640f605..974741a 100644
--- a/internal/satellite/module/processor/processor.go
+++ b/internal/satellite/module/processor/processor.go
@@ -60,7 +60,7 @@ func (p *Processor) Boot(ctx context.Context) {
 			case e := <-p.gatherer.OutputDataChannel():
 				c := &event.OutputEventContext{
 					Offset:  e.Offset,
-					Context: make(map[string]event.Event),
+					Context: make(map[string]*event.Event),
 				}
 				c.Put(e.Event)
 				// processing the event with filters, that put the necessary events to OutputEventContext.
diff --git a/internal/satellite/module/sender/sender.go b/internal/satellite/module/sender/sender.go
index 19a469b..37c8b1c 100644
--- a/internal/satellite/module/sender/sender.go
+++ b/internal/satellite/module/sender/sender.go
@@ -134,8 +134,8 @@ func (s *Sender) consume(batch *buffer.BatchBuffer) {
 	for i := 0; i < batch.Len(); i++ {
 		eventContext := batch.Buf()[i]
 		for _, e := range eventContext.Context {
-			if e.IsRemote() {
-				events[e.Type()] = append(events[e.Type()], e)
+			if e.Remote {
+				events[e.Type] = append(events[e.Type], e)
 			}
 		}
 	}
diff --git a/plugins/fetcher/api/fetcher.go b/plugins/fetcher/api/fetcher.go
index 4021473..83139a9 100644
--- a/plugins/fetcher/api/fetcher.go
+++ b/plugins/fetcher/api/fetcher.go
@@ -27,5 +27,5 @@ type Fetcher interface {
 	plugin.Plugin
 
 	// Fetch would fetch some APM data.
-	Fetch() []event.SerializableEvent
+	Fetch() event.BatchEvents
 }
diff --git a/plugins/init.go b/plugins/init.go
index 2bf3bf4..97f0177 100644
--- a/plugins/init.go
+++ b/plugins/init.go
@@ -24,7 +24,7 @@ import (
 	filter "github.com/apache/skywalking-satellite/plugins/filter/api"
 	forwarder "github.com/apache/skywalking-satellite/plugins/forwarder/api"
 	parser "github.com/apache/skywalking-satellite/plugins/parser/api"
-	queue "github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue"
 	receiver "github.com/apache/skywalking-satellite/plugins/receiver/api"
 	server "github.com/apache/skywalking-satellite/plugins/server/api"
 )
diff --git a/plugins/parser/api/parser.go b/plugins/parser/api/parser.go
index 754037f..3e09e3a 100644
--- a/plugins/parser/api/parser.go
+++ b/plugins/parser/api/parser.go
@@ -27,8 +27,8 @@ type Parser interface {
 	plugin.Plugin
 
 	// ParseBytes parse the byte buffer into events.
-	ParseBytes(bytes []byte) ([]event.SerializableEvent, error)
+	ParseBytes(bytes []byte) (event.BatchEvents, error)
 
 	// ParseStr parse the string into events.
-	ParseStr(str string) ([]event.SerializableEvent, error)
+	ParseStr(str string) (event.BatchEvents, error)
 }
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
index f25b2ff..a5ad163 100644
--- a/plugins/queue/api/queue.go
+++ b/plugins/queue/api/queue.go
@@ -18,6 +18,8 @@
 package api
 
 import (
+	"reflect"
+
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
@@ -26,14 +28,14 @@ import (
 type Queue interface {
 	plugin.Plugin
 
-	// Prepare creates the queue.
-	Prepare() error
+	// Initialize creates the queue.
+	Initialize() error
 
 	// Push a inputEvent into the queue.
-	Push(event event.SerializableEvent) error
+	Push(event *event.Event) error
 
 	// Pop returns a SequenceEvent when Queue is not empty,
-	Pop() chan *SequenceEvent
+	Pop() (*SequenceEvent, error)
 
 	// Close would close the queue.
 	Close() error
@@ -44,6 +46,11 @@ type Queue interface {
 
 // SequenceEvent is a wrapper to pass the event and the offset.
 type SequenceEvent struct {
-	Event  event.Event
+	Event  *event.Event
 	Offset event.Offset
 }
+
+// GetQueue an initialized filter plugin.
+func GetQueue(config plugin.Config) Queue {
+	return plugin.Get(reflect.TypeOf((*Queue)(nil)).Elem(), config).(Queue)
+}
diff --git a/plugins/queue/mmap/README.md b/plugins/queue/mmap/README.md
deleted file mode 100644
index 3f03ea1..0000000
--- a/plugins/queue/mmap/README.md
+++ /dev/null
@@ -1 +0,0 @@
-# Plugin description
\ No newline at end of file
diff --git a/plugins/queue/mmap/branchmark_test.go b/plugins/queue/mmap/branchmark_test.go
new file mode 100644
index 0000000..4bf9ee4
--- /dev/null
+++ b/plugins/queue/mmap/branchmark_test.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"fmt"
+	"os"
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+type benchmarkParam struct {
+	segmentSize      int
+	message          int // unit KB
+	maxInMemSegments int
+}
+
+var params = []benchmarkParam{
+	{segmentSize: 1024 * 128, message: 8, maxInMemSegments: 10},
+	// compare the influence of the segmentSize.
+	{segmentSize: 1024 * 256, message: 8, maxInMemSegments: 10},
+	// compare the influence of the maxInMemSegments.
+	{segmentSize: 1024 * 128, message: 8, maxInMemSegments: 20},
+	// compare the influence of the message size.
+	{segmentSize: 1024 * 128, message: 16, maxInMemSegments: 10},
+}
+
+func cleanBenchmarkQueue(b *testing.B, q api.Queue) {
+	if err := os.RemoveAll(q.(*Queue).QueueDir); err != nil {
+		b.Errorf("cannot remove test queue dir, %v", err)
+	}
+}
+
+func BenchmarkPush(b *testing.B) {
+	for _, param := range params {
+		name := fmt.Sprintf("segmentSize: %dKB maxInMemSegments:%d message:%dKB", param.segmentSize/1024, param.maxInMemSegments, param.message)
+		b.Run(name, func(b *testing.B) {
+			q, err := initMmapQueue(plugin.Config{
+				"queue_dir":               "BenchmarkPush",
+				"segment_size":            param.segmentSize,
+				"max_in_mem_segments":     param.maxInMemSegments,
+				"queue_capacity_segments": 10000,
+			})
+			if err != nil {
+				b.Fatalf("cannot get a mmap queue: %v", err)
+			}
+			event := getLargeEvent(param.message)
+			b.ReportAllocs()
+			b.ResetTimer()
+			println()
+			for i := 0; i < b.N; i++ {
+				if err := q.Push(event); err != nil {
+					b.Fatalf("error in pushing: %v", err)
+				}
+			}
+			b.StopTimer()
+			_ = q.Close()
+			cleanBenchmarkQueue(b, q)
+		})
+	}
+}
+
+func BenchmarkPushAndPop(b *testing.B) {
+	for _, param := range params {
+		name := fmt.Sprintf("segmentSize: %dKB maxInMemSegments:%d message:%dKB", param.segmentSize/1024, param.maxInMemSegments, param.message)
+		b.Run(name, func(b *testing.B) {
+			q, err := initMmapQueue(plugin.Config{
+				"queue_dir":           "BenchmarkPushAndPop",
+				"segment_size":        param.segmentSize,
+				"max_in_mem_segments": param.maxInMemSegments,
+			})
+			if err != nil {
+				b.Fatalf("cannot get a mmap queue: %v", err)
+			}
+			event := getLargeEvent(param.message)
+			b.ReportAllocs()
+			b.ResetTimer()
+			println()
+			for i := 0; i < b.N; i++ {
+				if err := q.Push(event); err != nil {
+					b.Fatalf("error in pushing: %v", err)
+				}
+				if _, err := q.Pop(); err != nil {
+					b.Fatalf("error in pushing: %v", err)
+				}
+			}
+			b.StopTimer()
+			_ = q.Close()
+			cleanBenchmarkQueue(b, q)
+		})
+	}
+}
diff --git a/plugins/queue/mmap/meta/meta.go b/plugins/queue/mmap/meta/meta.go
new file mode 100644
index 0000000..cfa32a3
--- /dev/null
+++ b/plugins/queue/mmap/meta/meta.go
@@ -0,0 +1,153 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package meta
+
+import (
+	"fmt"
+	"syscall"
+
+	"path/filepath"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+const (
+	metaSize    = 80
+	metaName    = "meta.dat"
+	metaVersion = 1
+)
+
+// Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment,
+// it takes at least one memory page size, which is generally 4K.
+//
+// [    8Bit   ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit  ]
+// [metaVersion][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][capacity]
+// [metaVersion][writing   offset][watermark offset][committed offset][reading   offset][capacity]
+type Metadata struct {
+	metaFile *mmap.File
+	name     string
+	size     int
+	capacity int
+}
+
+// NewMetaData read or create a Metadata with supported metaVersion
+func NewMetaData(metaDir string, capacity int) (*Metadata, error) {
+	path := filepath.Join(metaDir, metaName)
+	metaFile, err := segment.NewSegment(path, metaSize)
+	if err != nil {
+		return nil, fmt.Errorf("error in crating the Metadata memory mapped file: %v", err)
+	}
+
+	m := &Metadata{
+		metaFile: metaFile,
+		name:     metaName,
+		size:     metaSize,
+		capacity: capacity,
+	}
+
+	v := m.GetVersion()
+	if v != 0 && v != metaVersion {
+		return nil, fmt.Errorf("metadata metaVersion is not matching, the Metadata metaVersion is %d", v)
+	}
+	c := m.GetCapacity()
+	if c != 0 && c != capacity {
+		return nil, fmt.Errorf("metadata catapacity is not equal to the old capacity, the old capacity is %d", c)
+	}
+	m.PutVersion(metaVersion)
+	m.PutCapacity(int64(capacity))
+	return m, nil
+}
+
+// GetVersion returns the meta version.
+func (m *Metadata) GetVersion() int {
+	return int(m.metaFile.ReadUint64At(0))
+}
+
+// PutVersion put the version into the memory mapped file.
+func (m *Metadata) PutVersion(version int64) {
+	m.metaFile.WriteUint64At(uint64(version), 0)
+}
+
+// GetWritingOffset returns the writing offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetWritingOffset() (segmentID, offset int64) {
+	return int64(m.metaFile.ReadUint64At(8)), int64(m.metaFile.ReadUint64At(16))
+}
+
+// PutWritingOffset put the segment ID and the offset of the segment into the writing offset.
+func (m *Metadata) PutWritingOffset(segmentID, offset int64) {
+	m.metaFile.WriteUint64At(uint64(segmentID), 8)
+	m.metaFile.WriteUint64At(uint64(offset), 16)
+}
+
+// GetWatermarkOffset returns the watermark offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetWatermarkOffset() (segmentID, offset int64) {
+	return int64(m.metaFile.ReadUint64At(24)), int64(m.metaFile.ReadUint64At(32))
+}
+
+// PutWatermarkOffset put the segment ID and the offset of the segment into the watermark offset.
+func (m *Metadata) PutWatermarkOffset(segmentID, offset int64) {
+	m.metaFile.WriteUint64At(uint64(segmentID), 24)
+	m.metaFile.WriteUint64At(uint64(offset), 32)
+}
+
+// GetCommittedOffset returns the committed offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetCommittedOffset() (segmentID, offset int64) {
+	return int64(m.metaFile.ReadUint64At(40)), int64(m.metaFile.ReadUint64At(48))
+}
+
+// PutCommittedOffset put the segment ID and the offset of the segment into the committed offset.
+func (m *Metadata) PutCommittedOffset(segmentID, offset int64) {
+	m.metaFile.WriteUint64At(uint64(segmentID), 40)
+	m.metaFile.WriteUint64At(uint64(offset), 48)
+}
+
+// GetReadingOffset returns the reading offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetReadingOffset() (segmentID, offset int64) {
+	return int64(m.metaFile.ReadUint64At(56)), int64(m.metaFile.ReadUint64At(64))
+}
+
+// PutReadingOffset put the segment ID and the offset of the segment into the reading offset.
+func (m *Metadata) PutReadingOffset(segmentID, offset int64) {
+	m.metaFile.WriteUint64At(uint64(segmentID), 56)
+	m.metaFile.WriteUint64At(uint64(offset), 64)
+}
+
+// GetCapacity returns the capacity of the queue.
+func (m *Metadata) GetCapacity() int {
+	return int(m.metaFile.ReadUint64At(72))
+}
+
+// PutCapacity put the capacity into the memory mapped file.
+func (m *Metadata) PutCapacity(version int64) {
+	m.metaFile.WriteUint64At(uint64(version), 72)
+}
+
+// Flush the memory mapped file to the disk.
+func (m *Metadata) Flush() error {
+	return m.metaFile.Flush(syscall.MS_SYNC)
+}
+
+// Close do Flush operation and unmap the memory mapped file.
+func (m *Metadata) Close() error {
+	if err := m.Flush(); err != nil {
+		return err
+	}
+	return m.metaFile.Unmap()
+}
diff --git a/plugins/queue/mmap/meta/meta_test.go b/plugins/queue/mmap/meta/meta_test.go
new file mode 100644
index 0000000..fcc6b57
--- /dev/null
+++ b/plugins/queue/mmap/meta/meta_test.go
@@ -0,0 +1,208 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package meta
+
+import (
+	"os"
+	"reflect"
+	"testing"
+)
+
+type args struct {
+	metaVersion int
+	capacity    int
+	// ID
+	writingSegID   int64
+	readingSegID   int64
+	committedSegID int64
+	watermarkSegID int64
+	// offset
+	writingSegOffset   int64
+	readingSegOffset   int64
+	committedSegOffset int64
+	watermarkSegOffset int64
+}
+
+type tests struct {
+	name    string
+	args    args
+	want    args
+	wantErr bool
+}
+
+func Test_newMetaData(t *testing.T) {
+	const testDir = "testMeta"
+	const preCapacity = 500
+	params := []tests{
+		buildMetaverionErrorTest(),
+		buildNormalTest(),
+		buildCapacitynErrorTest(),
+	}
+
+	for _, tt := range params {
+		t.Run(tt.name, func(t *testing.T) {
+			// clean
+			defer func() {
+				if err := os.RemoveAll(testDir); err != nil {
+					t.Errorf("remove Metadata dir error: %v", err)
+				}
+			}()
+
+			got, err := NewMetaData(testDir, preCapacity)
+			if err != nil {
+				t.Errorf("cannot create Metadata file: %v", err)
+				return
+			}
+
+			// write args
+			got.PutVersion(int64(tt.args.metaVersion))
+			got.PutWritingOffset(tt.args.writingSegID, tt.args.writingSegOffset)
+			got.PutReadingOffset(tt.args.readingSegID, tt.args.readingSegOffset)
+			got.PutCommittedOffset(tt.args.committedSegID, tt.args.committedSegOffset)
+			got.PutWatermarkOffset(tt.args.watermarkSegID, tt.args.watermarkSegOffset)
+			if got.Close() != nil {
+				t.Errorf("cannot Close the Metadata file: %v", err)
+				return
+			}
+
+			oldMeta, err := NewMetaData(testDir, tt.args.capacity)
+			if err != nil {
+				if tt.wantErr {
+					return
+				}
+				t.Errorf("cannot read old Metadata file: %v", err)
+				return
+			}
+
+			// read args
+			wmID, wmOffset := oldMeta.GetWatermarkOffset()
+			cID, cOffset := oldMeta.GetCommittedOffset()
+			rID, rOffset := oldMeta.GetReadingOffset()
+			wID, wOffset := oldMeta.GetWritingOffset()
+
+			readArgs := args{
+				metaVersion:        oldMeta.GetVersion(),
+				capacity:           oldMeta.GetCapacity(),
+				watermarkSegID:     wmID,
+				writingSegID:       wID,
+				readingSegID:       rID,
+				committedSegID:     cID,
+				watermarkSegOffset: wmOffset,
+				readingSegOffset:   rOffset,
+				committedSegOffset: cOffset,
+				writingSegOffset:   wOffset,
+			}
+			if !reflect.DeepEqual(readArgs, tt.want) {
+				t.Errorf("want meta info is [%+v]\n ,but got [%+v]", tt.want, readArgs)
+			}
+		})
+	}
+}
+
+func buildMetaverionErrorTest() tests {
+	return tests{
+		name: "wrong version",
+		args: args{
+			metaVersion:        2,
+			capacity:           500,
+			watermarkSegID:     1,
+			writingSegID:       2,
+			readingSegID:       3,
+			committedSegID:     4,
+			watermarkSegOffset: 10,
+			readingSegOffset:   20,
+			committedSegOffset: 30,
+			writingSegOffset:   40,
+		},
+		want: args{
+			metaVersion:        2,
+			capacity:           500,
+			watermarkSegID:     1,
+			writingSegID:       2,
+			readingSegID:       3,
+			committedSegID:     4,
+			watermarkSegOffset: 10,
+			readingSegOffset:   20,
+			committedSegOffset: 30,
+			writingSegOffset:   40,
+		},
+		wantErr: true,
+	}
+}
+
+func buildCapacitynErrorTest() tests {
+	return tests{
+		name: "wrong version",
+		args: args{
+			metaVersion:        1,
+			capacity:           600,
+			watermarkSegID:     1,
+			writingSegID:       2,
+			readingSegID:       3,
+			committedSegID:     4,
+			watermarkSegOffset: 10,
+			readingSegOffset:   20,
+			committedSegOffset: 30,
+			writingSegOffset:   40,
+		},
+		want: args{
+			metaVersion:        1,
+			capacity:           600,
+			watermarkSegID:     1,
+			writingSegID:       2,
+			readingSegID:       3,
+			committedSegID:     4,
+			watermarkSegOffset: 10,
+			readingSegOffset:   20,
+			committedSegOffset: 30,
+			writingSegOffset:   40,
+		},
+		wantErr: true,
+	}
+}
+
+func buildNormalTest() tests {
+	return tests{
+		name: "correct version",
+		args: args{
+			metaVersion:        1,
+			capacity:           500,
+			watermarkSegID:     2,
+			writingSegID:       3,
+			readingSegID:       4,
+			committedSegID:     5,
+			watermarkSegOffset: 6,
+			readingSegOffset:   7,
+			committedSegOffset: 8,
+			writingSegOffset:   9,
+		},
+		want: args{
+			metaVersion:        1,
+			capacity:           500,
+			watermarkSegID:     2,
+			writingSegID:       3,
+			readingSegID:       4,
+			committedSegID:     5,
+			watermarkSegOffset: 6,
+			readingSegOffset:   7,
+			committedSegOffset: 8,
+			writingSegOffset:   9,
+		},
+		wantErr: true,
+	}
+}
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
new file mode 100644
index 0000000..e699420
--- /dev/null
+++ b/plugins/queue/mmap/queue.go
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"os"
+	"sync"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int    `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.
+	mmapCount              int            // The number of the memory mapped files.
+	unflushedNum           int            // The unflushed number.
+	flushChannel           chan struct{}  // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
+	insufficientMemChannel chan struct{}  // Notify when memory is insufficient
+	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
+
+	// control components
+	ctx        context.Context    // Parent ctx
+	cancel     context.CancelFunc // Parent ctx cancel function
+	showDownWg sync.WaitGroup     // The shutdown wait group.
+
+	bufPool *sync.Pool
+
+	encoder *Encoder
+	decoder *Decoder
+}
+
+func (q *Queue) Name() string {
+	return "mmap-queue"
+}
+
+func (q *Queue) Description() string {
+	return "this is a mmap queue"
+}
+
+func (q *Queue) DefaultConfig() string {
+	return `
+# The size of each segment. Default value is 128K. The unit is Byte.
+segment_size: 131072
+# The max num of segments in memory. Default value is 10.
+max_in_mem_segments: 10
+# The capacity of Queue = segment_size * queue_capacity_segments.
+queue_capacity_segments: 4000
+# The period flush time. The unit is ms. Default value is 1 second.
+flush_period: 1000
+# The max number in one flush time.  Default value is 10000.
+flush_ceiling_num: 10000
+# Contains all files in the queue.
+queue_dir: satellite-mmap-queue
+# The max size of the input event. Default value is 20k.
+max_event_size: 20480
+`
+}
+
+func (q *Queue) Initialize() error {
+	q.encoder = NewEncoder()
+	q.decoder = NewDecoder()
+
+	q.bufPool = &sync.Pool{New: func() interface{} {
+		return new(bytes.Buffer)
+	}}
+	// the size of each segment file should be a multiple of the page size.
+	pageSize := os.Getpagesize()
+	if q.SegmentSize%pageSize != 0 {
+		q.SegmentSize -= q.SegmentSize % pageSize
+	}
+	if q.SegmentSize/pageSize == 0 {
+		q.SegmentSize = 131072
+	}
+	// the minimum MaxInMemSegments value should be 4.
+	if q.MaxInMemSegments < 4 {
+		q.MaxInMemSegments = 4
+	}
+	// load metadata and override the reading or writing offset by the committed or watermark offset.
+	md, err := meta.NewMetaData(q.QueueDir, q.QueueCapacitySegments)
+	if err != nil {
+		return fmt.Errorf("error in creating the metadata: %v", err)
+	}
+	q.meta = md
+	cmID, cmOffset := md.GetCommittedOffset()
+	wmID, wmOffset := md.GetWatermarkOffset()
+	md.PutWritingOffset(wmID, wmOffset)
+	md.PutReadingOffset(cmID, cmOffset)
+	// keep the reading or writing segments in the memory.
+	q.segments = make([]*mmap.File, q.QueueCapacitySegments)
+	if _, err := q.GetSegment(cmID); err != nil {
+		return err
+	}
+	if _, err := q.GetSegment(wmID); err != nil {
+		return err
+	}
+	// init components
+	q.insufficientMemChannel = make(chan struct{})
+	q.sufficientMemChannel = make(chan struct{})
+	q.flushChannel = make(chan struct{})
+	q.ctx, q.cancel = context.WithCancel(context.Background())
+	// async supported processes.
+	q.showDownWg.Add(2)
+	go q.segmentSwapper()
+	go q.flush()
+	return nil
+}
+
+func (q *Queue) Push(e *event.Event) error {
+	data, err := q.encoder.serialize(e)
+	if err != nil {
+		return err
+	}
+	if len(data) > q.MaxEventSize {
+		return fmt.Errorf("cannot push the event to the queue because the size %dB is over ceiling", len(data))
+	}
+	return q.push(data)
+}
+
+func (q *Queue) Pop() (*api.SequenceEvent, error) {
+	data, id, offset, err := q.pop()
+	if err != nil {
+		return nil, err
+	}
+	e, err := q.decoder.deserialize(data)
+	if err != nil {
+		return nil, err
+	}
+	return &api.SequenceEvent{
+		Event:  e,
+		Offset: q.encodeOffset(id, offset),
+	}, nil
+}
+
+func (q *Queue) Close() error {
+	q.cancel()
+	q.showDownWg.Wait()
+	for i, segment := range q.segments {
+		if segment != nil {
+			err := segment.Unmap()
+			if err != nil {
+				log.Logger.Errorf("cannot unmap the segments: %d, %v", i, err)
+			}
+		}
+	}
+	if err := q.meta.Close(); err != nil {
+		log.Logger.Errorf("cannot unmap the metadata: %v", err)
+	}
+	return nil
+}
+
+func (q *Queue) Ack(lastOffset event.Offset) {
+	id, offset, err := q.decodeOffset(lastOffset)
+	if err != nil {
+		log.Logger.Errorf("cannot ack queue with the offset:%s", lastOffset)
+	}
+	q.meta.PutCommittedOffset(id, offset)
+}
diff --git a/plugins/queue/mmap/queue_opreation.go b/plugins/queue/mmap/queue_opreation.go
new file mode 100644
index 0000000..b73321e
--- /dev/null
+++ b/plugins/queue/mmap/queue_opreation.go
@@ -0,0 +1,236 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+const uInt64Size = 8
+
+// flush control the flush operation by timer or counter.
+func (q *Queue) flush() {
+	defer q.showDownWg.Done()
+	ctx, cancel := context.WithCancel(q.ctx)
+	defer cancel()
+	for {
+		timeTicker := time.NewTicker(time.Duration(q.FlushPeriod) * time.Millisecond)
+		select {
+		case <-q.flushChannel:
+			q.doFlush()
+		case <-timeTicker.C:
+			q.doFlush()
+		case <-ctx.Done():
+			q.doFlush()
+			return
+		}
+	}
+}
+
+// doFlush flush the segment and meta files to the disk.
+func (q *Queue) doFlush() {
+	q.Lock()
+	defer q.Unlock()
+	for _, segment := range q.segments {
+		if segment == nil {
+			continue
+		}
+		if err := segment.Flush(syscall.MS_SYNC); err != nil {
+			log.Logger.Errorf("cannot flush segment file: %v", err)
+		}
+	}
+	wid, woffset := q.meta.GetWritingOffset()
+	q.meta.PutWatermarkOffset(wid, woffset)
+	if err := q.meta.Flush(); err != nil {
+		log.Logger.Errorf("cannot flush meta file: %v", err)
+	}
+}
+
+// push writes the data into the file system. It first writes the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) push(bytes []byte) error {
+	if q.isFull() {
+		return fmt.Errorf("cannot push data when the queue is full")
+	}
+	id, offset := q.meta.GetWritingOffset()
+	id, offset, err := q.writeLength(len(bytes), id, offset)
+	if err != nil {
+		return err
+	}
+	id, offset, err = q.writeBytes(bytes, id, offset)
+	if err != nil {
+		return err
+	}
+	q.meta.PutWritingOffset(id, offset)
+	q.unflushedNum++
+	if q.unflushedNum == q.FlushCeilingNum {
+		q.flushChannel <- struct{}{}
+		q.unflushedNum = 0
+	}
+	return nil
+}
+
+// pop reads the data from the file system. It first reads the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) pop() (data []byte, rid, roffset int64, err error) {
+	if q.isEmpty() {
+		return nil, 0, 0, fmt.Errorf("cannot read data when the queue is empty")
+	}
+	id, offset := q.meta.GetReadingOffset()
+	id, offset, length, err := q.readLength(id, offset)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	bytes, id, offset, err := q.readBytes(id, offset, length)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	q.meta.PutReadingOffset(id, offset)
+	return bytes, id, offset, nil
+}
+
+// readBytes reads bytes into the memory mapped file.
+func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, newOffset int64, err error) {
+	counter := 0
+	res := make([]byte, length)
+	for {
+		segment, err := q.GetSegment(id)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		readBytes, err := segment.ReadAt(res[counter:], offset)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		counter += readBytes
+		offset += int64(readBytes)
+		if offset == int64(q.SegmentSize) {
+			id, offset = id+1, 0
+		}
+		if counter == length {
+			break
+		}
+	}
+	return res, id, offset, nil
+}
+
+// readLength reads the data length with 8 Bits spaces.
+func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int, err error) {
+	if offset+uInt64Size > int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	segment, err := q.GetSegment(id)
+	if err != nil {
+		return 0, 0, 0, err
+	}
+	num := segment.ReadUint64At(offset)
+	offset += uInt64Size
+	if offset == int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	return id, offset, int(num), nil
+}
+
+// writeLength write the data length with 8 Bits spaces.
+func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int64, err error) {
+	if offset+uInt64Size > int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	segment, err := q.GetSegment(id)
+	if err != nil {
+		return 0, 0, err
+	}
+	segment.WriteUint64At(uint64(length), offset)
+	offset += uInt64Size
+	if offset == int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	return id, offset, nil
+}
+
+// writeBytes writes bytes into the memory mapped file.
+func (q *Queue) writeBytes(bytes []byte, id, offset int64) (newID, newOffset int64, err error) {
+	counter := 0
+	length := len(bytes)
+
+	for {
+		segment, err := q.GetSegment(id)
+		if err != nil {
+			return 0, 0, err
+		}
+		writtenBytes, err := segment.WriteAt(bytes[counter:], offset)
+		if err != nil {
+			return 0, 0, err
+		}
+		counter += writtenBytes
+		offset += int64(writtenBytes)
+		if offset == int64(q.SegmentSize) {
+			id, offset = id+1, 0
+		}
+		if counter == length {
+			break
+		}
+	}
+	return id, offset, nil
+}
+
+// isEmpty returns the capacity status
+func (q *Queue) isEmpty() bool {
+	rid, roffset := q.meta.GetReadingOffset()
+	wid, woffset := q.meta.GetWritingOffset()
+	return rid == wid && roffset == woffset
+}
+
+// isEmpty returns the capacity status
+func (q *Queue) isFull() bool {
+	rid, _ := q.meta.GetReadingOffset()
+	wid, _ := q.meta.GetWritingOffset()
+	// ensure enough spaces to promise data stability.
+	maxWid := rid + int64(q.QueueCapacitySegments) - 1 - int64(q.MaxEventSize/q.SegmentSize)
+	return wid >= maxWid
+}
+
+// encode the meta to the offset
+func (q *Queue) encodeOffset(id, offset int64) event.Offset {
+	return event.Offset(strconv.FormatInt(id, 10) + "-" + strconv.FormatInt(offset, 10))
+}
+
+// decode the offset to the meta of the mmap queue.
+func (q *Queue) decodeOffset(val event.Offset) (id, offset int64, err error) {
+	arr := strings.Split(string(val), "-")
+	if len(arr) == 2 {
+		id, err := strconv.ParseInt(arr[0], 10, 64)
+		if err != nil {
+			return 0, 0, err
+		}
+		offset, err := strconv.ParseInt(arr[1], 10, 64)
+		if err != nil {
+			return 0, 0, err
+		}
+		return id, offset, nil
+	}
+	return 0, 0, fmt.Errorf("the input offset string is illegal: %s", val)
+}
diff --git a/plugins/queue/mmap/queue_test.go b/plugins/queue/mmap/queue_test.go
new file mode 100644
index 0000000..915400e
--- /dev/null
+++ b/plugins/queue/mmap/queue_test.go
@@ -0,0 +1,402 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+	"fmt"
+	"os"
+	"reflect"
+	"strconv"
+	"testing"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+func initMmapQueue(cfg plugin.Config) (*Queue, error) {
+	log.Init(&log.LoggerConfig{})
+	plugin.RegisterPluginCategory(reflect.TypeOf((*api.Queue)(nil)).Elem())
+	plugin.RegisterPlugin(&Queue{})
+	var config plugin.Config = map[string]interface{}{
+		plugin.NameField: "mmap-queue",
+	}
+	for k, v := range cfg {
+		config[k] = v
+	}
+	q := api.GetQueue(config)
+	if q == nil {
+		return nil, fmt.Errorf("cannot get a default config mmap queue from the registry")
+	}
+	if q.Initialize() != nil {
+		return nil, fmt.Errorf("queue cannot initialize")
+	}
+	return q.(*Queue), nil
+}
+
+func cleanTestQueue(t *testing.T, q api.Queue) {
+	if err := os.RemoveAll(q.(*Queue).QueueDir); err != nil {
+		t.Errorf("cannot remove test queue dir, %v", err)
+	}
+}
+
+func getBatchEvents(count int) []*event.Event {
+	var slice []*event.Event
+	msg := make([]byte, 0)
+	for i := 0; i < 2000; i++ {
+		m := []byte("a")
+		msg = append(msg, m...)
+	}
+	for i := 0; i < count; i++ {
+		slice = append(slice, &event.Event{
+			Name:      "event" + strconv.Itoa(i),
+			Timestamp: time.Now(),
+			Meta: map[string]string{
+				"meta": "mval" + strconv.Itoa(i),
+			},
+			Data: map[string]interface{}{
+				"data":  "dval" + strconv.Itoa(i),
+				"bytes": msg,
+				"index": i,
+			},
+			Type:   event.LogEvent,
+			Remote: true,
+		})
+	}
+	return slice
+}
+
+func getNKData(n int) []byte {
+	return bytes.Repeat([]byte("a"), 1024*n)
+}
+
+func getLargeEvent(n int) *event.Event {
+	return &event.Event{
+		Name:      "largeEvent",
+		Timestamp: time.Now(),
+		Meta: map[string]string{
+			"meta": "largeEvent",
+		},
+		Data: map[string]interface{}{
+			"data": getNKData(n),
+		},
+		Type:   event.LogEvent,
+		Remote: true,
+	}
+}
+
+func TestQueue_Normal(t *testing.T) {
+	q, err := initMmapQueue(plugin.Config{
+		"queue_dir": "TestQueue_Normal",
+	})
+	defer cleanTestQueue(t, q)
+	if err != nil {
+		t.Fatalf("error in initializing the mmap queue: %v", err)
+	}
+	events := getBatchEvents(10)
+	for _, e := range events {
+		if err = q.Push(e); err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+	}
+	for i := 0; i < 10; i++ {
+		sequenceEvent, err := q.Pop()
+		if err != nil {
+			t.Errorf("error in fetching data from queue: %v", err)
+		} else if !cmp.Equal(events[i], sequenceEvent.Event) {
+			t.Errorf("history data and fetching data is not equal\n,history:%+v\n. pop data:%+v\n", events[i], sequenceEvent.Event)
+		}
+	}
+}
+
+func TestQueue_ReadHistory(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":    "TestQueue_ReadHistory",
+		"segment_size": 10240,
+	}
+
+	q, err := initMmapQueue(cfg)
+	defer cleanTestQueue(t, q)
+	if err != nil {
+		t.Fatalf("error in initializing the mmap queue: %v", err)
+	}
+	// close the queue to create a history empty queue.
+	if err := q.Close(); err != nil {
+		t.Fatalf("error in closing queue, %v", err)
+	}
+
+	// test cases.
+	batchSize := 10
+	batchNum := 100
+	events := getBatchEvents(batchSize * batchNum)
+
+	// Insert batchNum pieces of data in batchNum times
+	for i := 0; i < batchSize; i++ {
+		// recreate the queue
+		q, err := initMmapQueue(cfg)
+		if err != nil {
+			t.Fatalf("error in initializing the mmap queue: %v", err)
+		}
+		for j := 0; j < batchNum; j++ {
+			index := i*batchSize + j
+			if err = q.Push(events[index]); err != nil {
+				t.Errorf("queue cannot push one event: %+v", err)
+			}
+		}
+		if err := q.Close(); err != nil {
+			t.Fatalf("error in closing queue, %v", err)
+		}
+	}
+
+	// Read batchNum pieces of data in batchNum times
+	for i := 0; i < batchSize; i++ {
+		// recreate the queue
+		q, err := initMmapQueue(cfg)
+		if err != nil {
+			t.Fatalf("error in initializing the mmap queue: %v", err)
+		}
+		for j := 0; j < batchNum; j++ {
+			index := i*batchSize + j
+			sequenceEvent, err := q.Pop()
+			if err != nil {
+				t.Errorf("error in fetching data from queue: %v", err)
+			} else if cmp.Equal(events[index], sequenceEvent.Event) {
+				q.Ack(sequenceEvent.Offset)
+			} else {
+				t.Errorf("history data and fetching data is not equal\n,history:%+v\n. pop data:%+v\n", events[index], sequenceEvent.Event)
+			}
+		}
+		if err := q.Close(); err != nil {
+			t.Fatalf("error in closing queue, %v", err)
+		}
+	}
+}
+
+func TestQueue_PushOverCeilingMsg(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":      "TestQueue_PushOverCeilingMsg",
+		"segment_size":   10240,
+		"max_event_size": 1024 * 8,
+	}
+	largeEvent := getLargeEvent(20)
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	err = q.Push(largeEvent)
+	if err == nil {
+		t.Fatalf("The insertion of the over ceiling event is not as expected")
+	} else {
+		fmt.Printf("want err: %v", err)
+	}
+}
+
+func TestQueue_FlushWhenReachNum(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":         "TestQueue_FlushWhenReachNum",
+		"segment_size":      10240,
+		"flush_ceiling_num": 5,
+		"flush_period":      1000 * 60,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	events := getBatchEvents(5)
+
+	for _, e := range events {
+		err = q.Push(e)
+		if err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+	}
+	time.Sleep(time.Second)
+	wID, wOffset := q.meta.GetWritingOffset()
+	wmID, wmOffset := q.meta.GetWatermarkOffset()
+	if wID != wmID || wOffset != wmOffset {
+		t.Fatalf("the flush operation was not invoking when reach the flush_ceiling_num.")
+	}
+}
+
+func TestQueue_FlushPeriod(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":         "TestQueue_FlushPeriod",
+		"segment_size":      10240,
+		"flush_ceiling_num": 50,
+		"flush_period":      1000 * 1,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	events := getBatchEvents(5)
+
+	for _, e := range events {
+		err = q.Push(e)
+		if err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+	}
+	time.Sleep(time.Second * 2)
+	wID, wOffset := q.meta.GetWritingOffset()
+	wmID, wmOffset := q.meta.GetWatermarkOffset()
+	if wID != wmID || wOffset != wmOffset {
+		t.Fatalf("the flush operation was not invoking when reach the flush_ceiling_num.")
+	}
+}
+
+func TestQueue_MemCost(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":           "TestQueue_MemCost",
+		"segment_size":        1024 * 4,
+		"max_in_mem_segments": 8,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	events := getBatchEvents(20)
+	var memcost []int
+	for _, e := range events {
+		err = q.Push(e)
+		memcost = append(memcost, q.mmapCount)
+		if err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+	}
+	want := []int{
+		1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 5, 6, 6, 7, 7, 8,
+	}
+	if !cmp.Equal(want, memcost) {
+		t.Fatalf("the memory cost trends are not in line with expectations,\n want: %v,\n but got: %v", want, memcost)
+	}
+}
+
+func TestQueue_OverSegmentEvent(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":    "TestQueue_OverSegmentEvent",
+		"segment_size": 1024 * 4,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	size := 10
+	wantPos := size * 1024 / q.SegmentSize
+	largeEvent := getLargeEvent(size)
+	err = q.Push(largeEvent)
+	if err != nil {
+		t.Errorf("queue cannot push one event: %+v", err)
+	}
+	id, _ := q.meta.GetWritingOffset()
+	if int(id) != wantPos {
+		t.Fatalf("the writing offset id should at %d segment, but at %d", id, wantPos)
+	}
+}
+
+func TestQueue_ReusingFiles(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":               "TestQueue_ReusingFiles",
+		"segment_size":            1024 * 4,
+		"queue_capacity_segments": 5,
+		"max_event_size":          1024 * 3,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+
+	for i := 0; i < 100; i++ {
+		err = q.Push(getLargeEvent(2))
+		if err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+		_, err := q.Pop()
+		if err != nil {
+			t.Errorf("error in fetching data from queue: %v", err)
+		}
+	}
+	rid, roffset := q.meta.GetReadingOffset()
+	wid, woffset := q.meta.GetWritingOffset()
+	fmt.Printf("rid:%d, roffset:%d, wid:%d, woffset:%d\n", rid, roffset, wid, woffset)
+	if int(wid) <= q.QueueCapacitySegments || int(rid) <= q.QueueCapacitySegments {
+		t.Errorf("cannot valid reusing files")
+	}
+}
+
+func TestQueue_Empty(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":               "TestQueue_ReusingFiles",
+		"segment_size":            1024 * 4,
+		"queue_capacity_segments": 10,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	for _, e := range getBatchEvents(3) {
+		err = q.Push(e)
+		if err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+	}
+	for i := 0; i < 3; i++ {
+		if _, err = q.Pop(); err != nil {
+			t.Errorf("error in fetching data from queue: %v", err)
+		}
+	}
+	_, err = q.Pop()
+	if err != nil && err.Error() != "cannot read data when the queue is empty" {
+		t.Fatalf("not except err: %v", err)
+	}
+}
+
+func TestQueue_Full(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":               "TestQueue_ReusingFiles",
+		"segment_size":            1024 * 4,
+		"queue_capacity_segments": 10,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	for _, e := range getBatchEvents(8) {
+		err = q.Push(e)
+		if err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+	}
+	err = q.Push(getLargeEvent(2))
+	if err != nil && err.Error() != "cannot push data when the queue is full" {
+		t.Fatalf("not except err: %v", err)
+	}
+}
diff --git a/plugins/queue/mmap/segment/segment.go b/plugins/queue/mmap/segment/segment.go
new file mode 100644
index 0000000..cf738d1
--- /dev/null
+++ b/plugins/queue/mmap/segment/segment.go
@@ -0,0 +1,68 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package segment
+
+import (
+	"fmt"
+	"os"
+	"path/filepath"
+	"syscall"
+
+	"github.com/grandecola/mmap"
+)
+
+const FileSuffix = "_segment.dat"
+
+// NewSegment returns a pointer to a memory mapped file according to the given file name and file size.
+// The size of each segment file should be a multiple of the page size.
+func NewSegment(name string, size int) (*mmap.File, error) {
+	name, err := filepath.Abs(name)
+	if err != nil {
+		return nil, fmt.Errorf("error in getting the absolute path of the segment file : %v", err)
+	}
+	paths, _ := filepath.Split(name)
+	_, err = os.Stat(paths)
+	if err != nil && os.IsNotExist(err) && os.MkdirAll(paths, 0744) != nil {
+		return nil, fmt.Errorf("error in creating the parent dirs of the segment file : %v", err)
+	}
+
+	file, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0744)
+	if err != nil {
+		return nil, fmt.Errorf("error in opening segment file : %v", err)
+	}
+
+	stat, err := file.Stat()
+	if err != nil {
+		return nil, fmt.Errorf("error in reading info of the segment file : %v", err)
+	}
+
+	if stat.Size() != int64(size) {
+		if file.Truncate(int64(size)) != nil {
+			return nil, fmt.Errorf("error in truncating file: %v", err)
+		}
+	}
+
+	segment, err := mmap.NewSharedFileMmap(file, 0, size, syscall.PROT_READ|syscall.PROT_WRITE)
+	if err != nil {
+		return nil, fmt.Errorf("error in creating the mmap segment file : %v", err)
+	}
+	if err := file.Close(); err != nil {
+		return nil, fmt.Errorf("error in closing the segment file : %v", err)
+	}
+	return segment, nil
+}
diff --git a/plugins/queue/mmap/segment/segment_test.go b/plugins/queue/mmap/segment/segment_test.go
new file mode 100644
index 0000000..98c38e6
--- /dev/null
+++ b/plugins/queue/mmap/segment/segment_test.go
@@ -0,0 +1,187 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package segment
+
+import (
+	"os"
+	"testing"
+
+	"github.com/grandecola/mmap"
+)
+
+func Test_newSegmentWithOldFile(t *testing.T) {
+	type args struct {
+		fileName     string
+		originalSize int
+		neededSize   int
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    int64
+		wantErr bool
+	}{
+		{
+			name: "less than the needed size",
+			args: args{
+				fileName:     "temp.segment",
+				originalSize: os.Getpagesize(),
+				neededSize:   os.Getpagesize() * 2,
+			},
+			want:    int64(os.Getpagesize() * 2),
+			wantErr: false,
+		},
+		{
+			name: "equal to the needed size",
+			args: args{
+				fileName:     "temp2.segment",
+				originalSize: os.Getpagesize() * 2,
+				neededSize:   os.Getpagesize() * 2,
+			},
+			want:    int64(os.Getpagesize() * 2),
+			wantErr: false,
+		},
+		{
+			name: "larger than the needed size",
+			args: args{
+				fileName:     "temp3.segment",
+				originalSize: os.Getpagesize() * 3,
+				neededSize:   os.Getpagesize() * 2,
+			},
+			want:    int64(os.Getpagesize() * 2),
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if file, err := os.Create(tt.args.fileName); err != nil {
+				t.Errorf("cannot create the original file: %v", err)
+				return
+			} else if err := file.Truncate(int64(tt.args.originalSize)); err != nil {
+				t.Errorf("cannot set the original file size: %v", err)
+			}
+
+			got, err := NewSegment(tt.args.fileName, tt.args.neededSize)
+			defer clean(got, tt.args.fileName, t)
+
+			if (err != nil) != tt.wantErr {
+				t.Errorf("NewSegment() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if file, err := os.Open(tt.args.fileName); err != nil {
+				t.Errorf("cannot open the mmap file: %v", err)
+			} else if stat, err := file.Stat(); err != nil {
+				t.Errorf("cannot read mmap file info: %v", err)
+			} else if stat.Size() != tt.want {
+				t.Errorf("want file size is %d ,but got %d", tt.want, stat.Size())
+			}
+		})
+	}
+}
+
+func Test_newSegmentSize(t *testing.T) {
+	type args struct {
+		fileName string
+		size     int
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    int64
+		wantErr bool
+	}{
+		{
+			name: "equal to page size",
+			args: args{
+				fileName: "temp2.segment",
+				size:     os.Getpagesize() * 2,
+			},
+			want:    int64(os.Getpagesize() * 2),
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := NewSegment(tt.args.fileName, tt.args.size)
+			defer clean(got, tt.args.fileName, t)
+
+			if (err != nil) != tt.wantErr {
+				t.Errorf("NewSegment() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if file, err := os.Open(tt.args.fileName); err != nil {
+				t.Errorf("cannot open the mmap file: %v", err)
+			} else if stat, err := file.Stat(); err != nil {
+				t.Errorf("cannot read mmap file info: %v", err)
+			} else if stat.Size() != tt.want {
+				t.Errorf("want file size is %d ,but got %d", tt.want, stat.Size())
+			}
+		})
+	}
+}
+
+func Test_newSegmentMultiDir(t *testing.T) {
+	const testDir = "testQueue"
+	defer func() {
+		if err := os.RemoveAll(testDir); err != nil {
+			t.Errorf("cannot clean the testqueue dir: %v", err)
+			return
+		}
+	}()
+
+	type args struct {
+		fileName string
+		size     int
+	}
+	tests := []struct {
+		name    string
+		args    args
+		wantErr bool
+	}{
+		{
+			name: "test multi dir",
+			args: args{
+				fileName: testDir + "/temp.segment",
+				size:     10,
+			},
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := NewSegment(tt.args.fileName, tt.args.size)
+			defer clean(got, tt.args.fileName, t)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("NewSegment() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+		})
+	}
+}
+
+func clean(file *mmap.File, fileName string, t *testing.T) {
+	if file == nil {
+		return
+	}
+	if err := file.Unmap(); err != nil {
+		t.Errorf("unmap segment file error: %v", err)
+	}
+	if err := os.Remove(fileName); err != nil {
+		t.Errorf("delete segment file error: %v", err)
+	}
+}
diff --git a/plugins/queue/mmap/segment_operation.go b/plugins/queue/mmap/segment_operation.go
new file mode 100644
index 0000000..9aa24d9
--- /dev/null
+++ b/plugins/queue/mmap/segment_operation.go
@@ -0,0 +1,142 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"path"
+	"strconv"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
+	index := q.GetIndex(segmentID)
+	if q.mmapCount >= q.MaxInMemSegments {
+		q.insufficientMemChannel <- struct{}{}
+		<-q.sufficientMemChannel
+	}
+	if err := q.mapSegment(segmentID); err != nil {
+		return nil, err
+	}
+	if q.segments[index] != nil {
+		return q.segments[index], nil
+	}
+	return nil, fmt.Errorf("cannot get a memory mapped file at %d segment", segmentID)
+}
+
+// mapSegment load the segment file reference to the segments.
+func (q *Queue) mapSegment(segmentID int64) error {
+	index := q.GetIndex(segmentID)
+	if q.segments[index] != nil {
+		return nil
+	}
+	filePath := path.Join(q.QueueDir, strconv.Itoa(index)+segment.FileSuffix)
+	file, err := segment.NewSegment(filePath, q.SegmentSize)
+	if err != nil {
+		return err
+	}
+	q.mmapCount++
+	q.segments[index] = file
+	return nil
+}
+
+// unmapSegment cancel the memory mapped status.
+func (q *Queue) unmapSegment(segmentID int64) error {
+	index := q.GetIndex(segmentID)
+	if q.segments[index] == nil {
+		return nil
+	}
+	if err := q.segments[index].Unmap(); err != nil {
+		return fmt.Errorf("error in unmap segemnt: %v", err)
+	}
+	q.mmapCount--
+	q.segments[index] = nil
+	return nil
+}
+
+// segmentSwapper run with a go routine to ensure the memory cost.
+func (q *Queue) segmentSwapper() {
+	defer q.showDownWg.Done()
+	ctx, cancel := context.WithCancel(q.ctx)
+	defer cancel()
+	for {
+		select {
+		case <-q.insufficientMemChannel:
+			if q.mmapCount >= q.MaxInMemSegments {
+				if q.doSwap() != nil {
+					log.Logger.Errorf("cannot get enough memory to receive new data")
+				}
+			}
+			q.sufficientMemChannel <- struct{}{}
+		case <-ctx.Done():
+			return
+		}
+	}
+}
+
+// doSwap swap the memory mapped files to normal files to promise the memory resources cost.
+func (q *Queue) doSwap() error {
+	rID, _ := q.meta.GetReadingOffset()
+	wID, _ := q.meta.GetWritingOffset()
+	logicRID := rID + int64(q.QueueCapacitySegments)
+	logicWID := wID + int64(q.QueueCapacitySegments)
+	wIndex := q.GetIndex(wID)
+	rIndex := q.GetIndex(rID)
+	for q.mmapCount >= q.MaxInMemSegments {
+		for i := logicRID - int64(q.MaxInMemSegments); i >= 0 && i < logicRID; i++ {
+			if q.GetIndex(i) == wIndex || q.GetIndex(i) == rIndex {
+				continue
+			}
+
+			if err := q.unmapSegment(i); err != nil {
+				return err
+			}
+			// the writing segment and the reading segment should still in memory.
+			// q.MaxInMemSegments/2-1 means keeping half available spaces to receive new data.
+			if q.MaxInMemSegments-q.mmapCount >= q.MaxInMemSegments/2-1 {
+				return nil
+			}
+		}
+		for i := logicWID - 1; i >= 0 && i >= logicWID-int64(q.MaxInMemSegments); i-- {
+			if q.GetIndex(i) == wIndex || q.GetIndex(i) == rIndex {
+				continue
+			}
+
+			if err := q.unmapSegment(i); err != nil {
+				return err
+			}
+			// the writing segment and the reading segment should still in memory.
+			// q.MaxInMemSegments/2-1 means keeping half available spaces to receive new data.
+			if q.MaxInMemSegments-q.mmapCount >= q.MaxInMemSegments/2-1 {
+				return nil
+			}
+		}
+	}
+	return nil
+}
+
+// GetIndex returns the index of the segments.
+func (q *Queue) GetIndex(segmentID int64) int {
+	return int(segmentID) % q.QueueCapacitySegments
+}
diff --git a/plugins/queue/mmap/serializer.go b/plugins/queue/mmap/serializer.go
new file mode 100644
index 0000000..38a5dd1
--- /dev/null
+++ b/plugins/queue/mmap/serializer.go
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+
+	"encoding/gob"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+)
+
+// Decoder used in pop operation for reusing gob.Decoder and buf.
+type Decoder struct {
+	buf     *bytes.Buffer
+	decoder *gob.Decoder
+}
+
+// Encoder used in push operation for reusing gob.Decoder and buf.
+type Encoder struct {
+	buf     *bytes.Buffer
+	encoder *gob.Encoder
+}
+
+func NewDecoder() *Decoder {
+	buf := new(bytes.Buffer)
+	return &Decoder{
+		buf:     buf,
+		decoder: gob.NewDecoder(buf),
+	}
+}
+
+func NewEncoder() *Encoder {
+	buf := new(bytes.Buffer)
+	return &Encoder{
+		buf:     buf,
+		encoder: gob.NewEncoder(buf),
+	}
+}
+
+func (d *Decoder) deserialize(b []byte) (*event.Event, error) {
+	defer d.buf.Reset()
+	d.buf.Write(b)
+	e := &event.Event{}
+	err := d.decoder.Decode(e)
+	if err != nil {
+		return nil, err
+	}
+	return e, nil
+}
+
+func (e *Encoder) serialize(data *event.Event) ([]byte, error) {
+	defer e.buf.Reset()
+	err := e.encoder.Encode(data)
+	if err != nil {
+		return nil, err
+	}
+	return e.buf.Bytes(), nil
+}
diff --git a/plugins/queue/api/queue_repository.go b/plugins/queue/queue_repository.go
similarity index 76%
rename from plugins/queue/api/queue_repository.go
rename to plugins/queue/queue_repository.go
index 11031d0..1c49822 100644
--- a/plugins/queue/api/queue_repository.go
+++ b/plugins/queue/queue_repository.go
@@ -15,26 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package api
+package queue
 
 import (
 	"reflect"
 
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap"
 )
 
-// GetQueue an initialized filter plugin.
-func GetQueue(config plugin.Config) Queue {
-	return plugin.Get(reflect.TypeOf((*Queue)(nil)).Elem(), config).(Queue)
-}
-
 // RegisterQueuePlugins register the used queue plugins.
 func RegisterQueuePlugins() {
-	plugin.RegisterPluginCategory(reflect.TypeOf((*Queue)(nil)).Elem())
-	queues := []Queue{
+	plugin.RegisterPluginCategory(reflect.TypeOf((*api.Queue)(nil)).Elem())
+	queues := []api.Queue{
+		&mmap.Queue{},
 		// Please register the queue plugins at here.
 	}
-	for _, queue := range queues {
-		plugin.RegisterPlugin(queue)
+	for _, q := range queues {
+		plugin.RegisterPlugin(q)
 	}
 }
diff --git a/plugins/receiver/api/receiver.go b/plugins/receiver/api/receiver.go
index 9e47692..ab04d9c 100644
--- a/plugins/receiver/api/receiver.go
+++ b/plugins/receiver/api/receiver.go
@@ -31,5 +31,5 @@ type Receiver interface {
 	RegisterHandler(server api.Server)
 
 	// Channel would be put a data when the receiver receives an APM data.
-	Channel() <-chan event.SerializableEvent
+	Channel() <-chan *event.Event
 }