You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2020/12/16 07:54:25 UTC

[skywalking-satellite] branch mmap-queue created (now f2dfcfe)

This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a change to branch mmap-queue
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git.


      at f2dfcfe  mmap-queue-plugin

This branch includes the following new commits:

     new f2dfcfe  mmap-queue-plugin

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-satellite] 01/01: mmap-queue-plugin

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch mmap-queue
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit f2dfcfe1cf6a4881bf5a9d3537cb3661fbe5ee6b
Author: Evan <ev...@outlook.com>
AuthorDate: Wed Dec 16 15:53:59 2020 +0800

    mmap-queue-plugin
---
 Makefile                                           |   5 +-
 dist/LICENSE                                       |   3 +-
 dist/licenses/LICENSE-go-cmp                       |  27 ++
 dist/licenses/LICENSE-mmap                         |  21 ++
 docs/configuration/queue.md                        |  11 +
 docs/plugins/queue/mmap/README.md                  |  66 ++++
 go.mod                                             |   3 +-
 go.sum                                             |  18 +-
 internal/pkg/event/event.go                        |  53 +--
 internal/satellite/event/event.go                  | 109 ------
 .../satellite/module/gatherer/fetcher_gatherer.go  |  26 +-
 .../satellite/module/gatherer/receiver_gatherer.go |  25 +-
 internal/satellite/module/processor/processor.go   |   2 +-
 internal/satellite/module/sender/sender.go         |   4 +-
 plugins/fetcher/api/fetcher.go                     |   2 +-
 plugins/init.go                                    |   2 +-
 plugins/parser/api/parser.go                       |   4 +-
 plugins/queue/api/queue.go                         |  17 +-
 plugins/queue/mmap/README.md                       |   1 -
 plugins/queue/mmap/branchmark_test.go              | 109 ++++++
 plugins/queue/mmap/meta/meta.go                    | 153 ++++++++
 plugins/queue/mmap/meta/meta_test.go               | 208 +++++++++++
 plugins/queue/mmap/queue.go                        | 192 ++++++++++
 plugins/queue/mmap/queue_opreation.go              | 236 ++++++++++++
 plugins/queue/mmap/queue_test.go                   | 402 +++++++++++++++++++++
 plugins/queue/mmap/segment/segment.go              |  68 ++++
 plugins/queue/mmap/segment/segment_test.go         | 187 ++++++++++
 plugins/queue/mmap/segment_operation.go            | 142 ++++++++
 plugins/queue/mmap/serializer.go                   |  74 ++++
 plugins/queue/{api => }/queue_repository.go        |  18 +-
 plugins/receiver/api/receiver.go                   |   2 +-
 31 files changed, 2004 insertions(+), 186 deletions(-)

diff --git a/Makefile b/Makefile
index dbbc0e8..9ff7285 100644
--- a/Makefile
+++ b/Makefile
@@ -25,6 +25,7 @@ RELEASE_SRC = skywalking-satellite-$(VERSION)-src
 OS = $(shell uname)
 
 GO = go
+GIT = git
 GO_PATH = $$($(GO) env GOPATH)
 GO_BUILD = $(GO) build
 GO_GET = $(GO) get
@@ -36,7 +37,7 @@ GO_BUILD_FLAGS = -v
 GO_BUILD_LDFLAGS = -X main.version=$(VERSION)
 GQL_GEN = $(GO_PATH)/bin/gqlgen
 
-PLATFORMS := windows linux darwin
+PLATFORMS := linux darwin
 os = $(word 1, $@)
 ARCH = amd64
 
@@ -72,7 +73,7 @@ clean: tools
 	-rm -rf coverage.txt
 
 .PHONY: build
-build: deps windows linux darwin
+build: deps linux darwin
 
 
 .PHONY: $(PLATFORMS)
diff --git a/dist/LICENSE b/dist/LICENSE
index 618fe55..38f0d7a 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -222,7 +222,7 @@ BSD licenses
 
 The following components are provided under a BSD license. See project link for details.
 The text of each license is also included at licenses/LICENSE-[project].txt.
-
+    google (go-cmp) v0.5.4 https://github.com/google/go-cmp BSD
 
 ========================================================================
 MIT licenses
@@ -234,3 +234,4 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
 	sirupsen (logrus) v1.7.0: https://github.com/sirupsen/logrus MIT
 	spf13 (viper) v1.7.1: https://github.com/spf13/viper MIT
 	urfave (cli) v2.3.0: https://github.com/urfave/cli MIT
+	grandecola (mmap) v0.6.0: https://github.com/grandecola/mmap MIT
diff --git a/dist/licenses/LICENSE-go-cmp b/dist/licenses/LICENSE-go-cmp
new file mode 100644
index 0000000..32017f8
--- /dev/null
+++ b/dist/licenses/LICENSE-go-cmp
@@ -0,0 +1,27 @@
+Copyright (c) 2017 The Go Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+   * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+   * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+   * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/dist/licenses/LICENSE-mmap b/dist/licenses/LICENSE-mmap
new file mode 100644
index 0000000..be28ae0
--- /dev/null
+++ b/dist/licenses/LICENSE-mmap
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Aman Mangal
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/docs/configuration/queue.md b/docs/configuration/queue.md
new file mode 100644
index 0000000..b7d81b1
--- /dev/null
+++ b/docs/configuration/queue.md
@@ -0,0 +1,11 @@
+# queue configuration
+
+|  Type   | Param  | DefaultValue| Meaning| 
+|  ----  | ----  |----  | ----  |
+| mmap-queue  | segment_size | 131072 | The size of each segment. The unit is Byte.
+| mmap-queue  | max_in_mem_segments | 10 | The max num of segments in memory.
+| mmap-queue  | queue_capacity_segments | 4000 | The capacity of Queue = segment_size * queue_capacity_segments.
+| mmap-queue  | flush_period | 1000 | The period flush time. The unit is ms.
+| mmap-queue  | flush_ceiling_num | 10000 | The max number in one flush time.
+| mmap-queue  | queue_dir | satellite-mmap-queue |Contains all files in the queue.
+| mmap-queue  | max_event_size | 20480 |The max size of the input event. The unit is Byte.
diff --git a/docs/plugins/queue/mmap/README.md b/docs/plugins/queue/mmap/README.md
new file mode 100644
index 0000000..9c8379c
--- /dev/null
+++ b/docs/plugins/queue/mmap/README.md
@@ -0,0 +1,66 @@
+# Design
+The mmap-queue is a big, fast and persistent queue based on the memory mapped files. One mmap-queue has a directory to store the whole data. The Queue directory is made up with many segments and 1 meta file. 
+
+- Segment: Segment is the real data store center, that provides large-space storage and does not reduce read and write performance as much as possible by using mmap. And we will avoid deleting files by reusing them.
+- Meta: The purpose of meta is to find the data that the consumer needs.
+
+## Meta
+Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment, it takes at least one memory page size, which is generally 4K.
+```
+[    8Bit   ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit  ]
+[metaVersion][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][capacity]
+[metaVersion][writing   offset][watermark offset][committed offset][reading   offset][capacity]
+
+```
+### Transforming
+
+![](https://skywalking.apache.org/blog/2020-11-25-skywalking-satellite-0.1.0-design/offset-convert.jpg)
+
+## Configuration
+[Configuration Params](../../../configuration/queue.md)
+
+## Segment
+Segments are a series of files of the same size. Each input data would cost `8Bit+Len(data)Bit` to store the raw bytes. The first 8Bit is equal to `Len(data)` for finding the ending position. 
+### Swapper
+For the performance and resources thinking, we define a page replacement policy.
+
+- Keep the reading and writing segments on the memory.
+- When the mmapcount is greater or equals to the max_in_mem_segments, we first scan the read scope and then scan the written scope to swap the segments to promise the reading or writing segments are always in memory.
+    - Read scope: [reading_offset - max_in_mem_segments,reading_offset - 1]
+    - Written scope: [writing_offset - max_in_mem_segments,writing_offset - 1]
+    - Each displacement operation guarantees at least `max_in_mem_segments/2-1` capacity available. Subtract operation to subtract the amount of memory that must always exist.
+
+## BenchmarkTest
+Base machine: macbook pro 2018 Intel Core i7 16 GB 2400 MHz DDR4 SSD
+
+### push operation
+
+```
+BenchmarkPush
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:8KB
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:8KB-12         	   42470	     25098 ns/op	     466 B/op	      10 allocs/op
+
+BenchmarkPush/segmentSize:_256KB_maxInMemSegments:10_message:8KB
+BenchmarkPush/segmentSize:_256KB_maxInMemSegments:10_message:8KB-12         	   76905	     18910 ns/op	     418 B/op	      10 allocs/op
+
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:20_message:8KB
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:20_message:8KB-12         	   58221	     22258 ns/op	     465 B/op	      10 allocs/op
+
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:16KB
+BenchmarkPush/segmentSize:_128KB_maxInMemSegments:10_message:16KB-12        	   34053	     48635 ns/op	     566 B/op	      11 allocs/op
+```
+### push and pop operation
+```
+BenchmarkPushAndPop
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:8KB
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:8KB-12         	   22273	     45872 ns/op	   19512 B/op	      40 allocs/op
+
+BenchmarkPushAndPop/segmentSize:_256KB_maxInMemSegments:10_message:8KB
+BenchmarkPushAndPop/segmentSize:_256KB_maxInMemSegments:10_message:8KB-12         	   38874	     37169 ns/op	   19456 B/op	      40 allocs/op
+
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:20_message:8KB
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:20_message:8KB-12         	   38048	     36274 ns/op	   19514 B/op	      40 allocs/op
+
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:16KB
+BenchmarkPushAndPop/segmentSize:_128KB_maxInMemSegments:10_message:16KB-12        	   19768	     63399 ns/op	   36893 B/op	      41 allocs/op
+```
diff --git a/go.mod b/go.mod
index 3bf717f..1c012a9 100644
--- a/go.mod
+++ b/go.mod
@@ -3,8 +3,9 @@ module github.com/apache/skywalking-satellite
 go 1.14
 
 require (
+	github.com/google/go-cmp v0.5.4
+	github.com/grandecola/mmap v0.6.0
 	github.com/sirupsen/logrus v1.7.0
 	github.com/spf13/viper v1.7.1
 	github.com/urfave/cli/v2 v2.3.0
 )
-
diff --git a/go.sum b/go.sum
index e84cc08..23f05df 100644
--- a/go.sum
+++ b/go.sum
@@ -11,6 +11,7 @@ cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqCl
 cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
 cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
+github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
@@ -59,15 +60,22 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
 github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
+github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
 github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
 github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
 github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
 github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
 github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
 github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/grandecola/mmap v0.6.0 h1:dYrZWLay1rDlmlsGsSIoXGQ+JMu/t2ZnKt8vT1h+1o0=
+github.com/grandecola/mmap v0.6.0/go.mod h1:q5v9jpm393rcp5PXE6biArHKc2SWJBpXjfxSRtQMtNU=
 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
 github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
@@ -95,14 +103,17 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J
 github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
 github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
 github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
 github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
@@ -150,7 +161,9 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
 github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
 github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
@@ -168,11 +181,11 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
 github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
-github.com/urfave/cli v1.22.5 h1:lNq9sAHXK2qfdI8W+GRItjCEkI+2oR4d+MEHy1CKXoU=
 github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
 github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
@@ -267,6 +280,8 @@ golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtn
 golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8=
 golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
 google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
 google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
@@ -290,6 +305,7 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq
 google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
 gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
diff --git a/internal/pkg/event/event.go b/internal/pkg/event/event.go
index a834fae..2ed10cf 100644
--- a/internal/pkg/event/event.go
+++ b/internal/pkg/event/event.go
@@ -39,56 +39,31 @@ type Type int32
 // Offset is a generic form, which allows having different definitions in different Queues.
 type Offset string
 
-// Event that implement this interface would be allowed to transmit in the Satellite.
-type Event interface {
-	// Name returns the event name.
-	Name() string
-
-	// Meta is a pair of key and value to record meta data, such as labels.
-	Meta() map[string]string
-
-	// Data returns the wrapped data.
-	Data() interface{}
-
-	// Time returns the event time.
-	Time() time.Time
-
-	// Type is to distinguish different events.
-	Type() Type
-
-	// IsRemote means is a output event when returns true.
-	IsRemote() bool
-}
-
-// SerializableEvent is used in Collector to bridge Queue.
-type SerializableEvent interface {
-	Event
-
-	// ToBytes serialize the event to a byte array.
-	ToBytes() []byte
-
-	// FromBytes deserialize the byte array to an event.
-	FromBytes(bytes []byte) SerializableEvent
+type Event struct {
+	Name      string
+	Timestamp time.Time
+	Meta      map[string]string
+	Type      Type
+	Remote    bool
+	Data      map[string]interface{}
 }
 
 // BatchEvents is used by Forwarder to forward.
-type BatchEvents []Event
+type BatchEvents []*Event
 
 // OutputEventContext is a container to store the output context.
 type OutputEventContext struct {
-	Context map[string]Event
+	Context map[string]*Event
 	Offset  Offset
 }
 
-// Put puts the incoming event into the context when the event is a remote event.
-func (c *OutputEventContext) Put(event Event) {
-	if event.IsRemote() {
-		c.Context[event.Name()] = event
-	}
+// Put puts the incoming event into the context.
+func (c *OutputEventContext) Put(event *Event) {
+	c.Context[event.Name] = event
 }
 
-// Get returns a event in the context. When the eventName does not exist, a error would be returned.
-func (c *OutputEventContext) Get(eventName string) (Event, error) {
+// Get returns an event in the context. When the eventName does not exist, an error would be returned.
+func (c *OutputEventContext) Get(eventName string) (*Event, error) {
 	e, ok := c.Context[eventName]
 	if !ok {
 		err := fmt.Errorf("cannot find the event name in OutputEventContext : %s", eventName)
diff --git a/internal/satellite/event/event.go b/internal/satellite/event/event.go
deleted file mode 100644
index a53720b..0000000
--- a/internal/satellite/event/event.go
+++ /dev/null
@@ -1,109 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package event
-
-import (
-	"time"
-
-	"github.com/apache/skywalking-satellite/internal/pkg/event"
-)
-
-// Event defines the common fields.
-type Event struct {
-	name      string
-	timestamp time.Time
-	meta      map[string]string
-	eventType event.Type
-	remote    bool
-}
-
-// ETBFunc serialize event to bytes.
-type ETBFunc func(event event.SerializableEvent) []byte
-
-// BToFunc deserialize bytes to event.
-type BToFunc func(bytes []byte) event.SerializableEvent
-
-// StructuredEvent works when the data is a struct type.
-type StructuredEvent struct {
-	Event
-	data interface{}
-}
-
-// StructuredEvent works when the data is not a struct type.
-type UnstructuredEvent struct {
-	Event
-	data map[string]interface{}
-}
-
-// StructuredEvent works when the data is a struct type in the collector.
-type StructuredSerializableEvent struct {
-	StructuredEvent
-	etb ETBFunc
-	bte BToFunc
-}
-
-// StructuredEvent works when the data is not a struct type in the collector.
-type UnstructuredSerializableEvent struct {
-	UnstructuredEvent
-	etb ETBFunc
-	bte BToFunc
-}
-
-func (s *Event) Name() string {
-	return s.name
-}
-
-func (s *Event) Meta() map[string]string {
-	return s.meta
-}
-
-func (s *Event) Time() time.Time {
-	return s.timestamp
-}
-
-func (s *Event) Type() event.Type {
-	return s.eventType
-}
-
-func (s *Event) IsRemote() bool {
-	return s.remote
-}
-
-func (u *StructuredEvent) Data() interface{} {
-	return u.data
-}
-
-func (u *UnstructuredEvent) Data() interface{} {
-	return u.data
-}
-
-func (s *StructuredSerializableEvent) ToBytes() []byte {
-	return s.etb(s)
-}
-
-func (s *StructuredSerializableEvent) FromBytes(bytes []byte) event.SerializableEvent {
-	return s.bte(bytes)
-}
-
-func (u *UnstructuredSerializableEvent) ToBytes() []byte {
-	return u.etb(u)
-}
-
-func (u *UnstructuredSerializableEvent) FromBytes(bytes []byte) event.SerializableEvent {
-	return u.bte(bytes)
-}
diff --git a/internal/satellite/module/gatherer/fetcher_gatherer.go b/internal/satellite/module/gatherer/fetcher_gatherer.go
index 77ac6d0..f218657 100644
--- a/internal/satellite/module/gatherer/fetcher_gatherer.go
+++ b/internal/satellite/module/gatherer/fetcher_gatherer.go
@@ -47,7 +47,7 @@ func (f *FetcherGatherer) Prepare() error {
 
 func (f *FetcherGatherer) Boot(ctx context.Context) {
 	var wg sync.WaitGroup
-	wg.Add(1)
+	wg.Add(2)
 	go func() {
 		defer wg.Done()
 		timeTicker := time.NewTicker(time.Duration(f.config.FetchInterval) * time.Millisecond)
@@ -55,21 +55,37 @@ func (f *FetcherGatherer) Boot(ctx context.Context) {
 			select {
 			case <-timeTicker.C:
 				events := f.runningFetcher.Fetch()
-				for _, event := range events {
-					err := f.runningQueue.Push(event)
+				for _, e := range events {
+					err := f.runningQueue.Push(e)
 					if err != nil {
 						// todo add abandonedCount metrics
 						log.Logger.Errorf("cannot put event into queue in %s namespace, %v", f.config.NamespaceName, err)
 					}
 				}
-			case e := <-f.runningQueue.Pop():
-				f.outputChannel <- e
 			case <-ctx.Done():
 				f.Shutdown()
 				return
 			}
 		}
 	}()
+
+	go func() {
+		defer wg.Done()
+		for {
+			select {
+			case <-ctx.Done():
+				f.Shutdown()
+				return
+			default:
+				if e, err := f.runningQueue.Pop(); err == nil {
+					f.outputChannel <- e
+				} else {
+					log.Logger.Errorf("error in popping from the queue: %v", err)
+					time.Sleep(time.Second)
+				}
+			}
+		}
+	}()
 	wg.Wait()
 }
 
diff --git a/internal/satellite/module/gatherer/receiver_gatherer.go b/internal/satellite/module/gatherer/receiver_gatherer.go
index 5735474..de812a5 100644
--- a/internal/satellite/module/gatherer/receiver_gatherer.go
+++ b/internal/satellite/module/gatherer/receiver_gatherer.go
@@ -20,6 +20,7 @@ package gatherer
 import (
 	"context"
 	"sync"
+	"time"
 
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
@@ -45,7 +46,7 @@ type ReceiverGatherer struct {
 func (r *ReceiverGatherer) Prepare() error {
 	log.Logger.Infof("receiver gatherer module of %s namespace is preparing", r.config.NamespaceName)
 	r.runningReceiver.RegisterHandler(r.runningServer)
-	if err := r.runningQueue.Prepare(); err != nil {
+	if err := r.runningQueue.Initialize(); err != nil {
 		log.Logger.Infof("the %s queue of %s namespace was failed to initialize", r.runningQueue.Name(), r.config.NamespaceName)
 		return err
 	}
@@ -54,7 +55,7 @@ func (r *ReceiverGatherer) Prepare() error {
 
 func (r *ReceiverGatherer) Boot(ctx context.Context) {
 	var wg sync.WaitGroup
-	wg.Add(1)
+	wg.Add(2)
 	go func() {
 		defer wg.Done()
 		for {
@@ -65,14 +66,30 @@ func (r *ReceiverGatherer) Boot(ctx context.Context) {
 					// todo add abandonedCount metrics
 					log.Logger.Errorf("cannot put event into queue in %s namespace, error is: %v", r.config.NamespaceName, err)
 				}
-			case e := <-r.runningQueue.Pop():
-				r.outputChannel <- e
 			case <-ctx.Done():
 				r.Shutdown()
 				return
 			}
 		}
 	}()
+
+	go func() {
+		defer wg.Done()
+		for {
+			select {
+			case <-ctx.Done():
+				r.Shutdown()
+				return
+			default:
+				if e, err := r.runningQueue.Pop(); err == nil {
+					r.outputChannel <- e
+				} else {
+					log.Logger.Errorf("error in popping from the queue: %v", err)
+					time.Sleep(time.Second)
+				}
+			}
+		}
+	}()
 	wg.Wait()
 }
 
diff --git a/internal/satellite/module/processor/processor.go b/internal/satellite/module/processor/processor.go
index 640f605..974741a 100644
--- a/internal/satellite/module/processor/processor.go
+++ b/internal/satellite/module/processor/processor.go
@@ -60,7 +60,7 @@ func (p *Processor) Boot(ctx context.Context) {
 			case e := <-p.gatherer.OutputDataChannel():
 				c := &event.OutputEventContext{
 					Offset:  e.Offset,
-					Context: make(map[string]event.Event),
+					Context: make(map[string]*event.Event),
 				}
 				c.Put(e.Event)
 				// processing the event with filters, that put the necessary events to OutputEventContext.
diff --git a/internal/satellite/module/sender/sender.go b/internal/satellite/module/sender/sender.go
index 19a469b..37c8b1c 100644
--- a/internal/satellite/module/sender/sender.go
+++ b/internal/satellite/module/sender/sender.go
@@ -134,8 +134,8 @@ func (s *Sender) consume(batch *buffer.BatchBuffer) {
 	for i := 0; i < batch.Len(); i++ {
 		eventContext := batch.Buf()[i]
 		for _, e := range eventContext.Context {
-			if e.IsRemote() {
-				events[e.Type()] = append(events[e.Type()], e)
+			if e.Remote {
+				events[e.Type] = append(events[e.Type], e)
 			}
 		}
 	}
diff --git a/plugins/fetcher/api/fetcher.go b/plugins/fetcher/api/fetcher.go
index 4021473..83139a9 100644
--- a/plugins/fetcher/api/fetcher.go
+++ b/plugins/fetcher/api/fetcher.go
@@ -27,5 +27,5 @@ type Fetcher interface {
 	plugin.Plugin
 
 	// Fetch would fetch some APM data.
-	Fetch() []event.SerializableEvent
+	Fetch() event.BatchEvents
 }
diff --git a/plugins/init.go b/plugins/init.go
index 2bf3bf4..97f0177 100644
--- a/plugins/init.go
+++ b/plugins/init.go
@@ -24,7 +24,7 @@ import (
 	filter "github.com/apache/skywalking-satellite/plugins/filter/api"
 	forwarder "github.com/apache/skywalking-satellite/plugins/forwarder/api"
 	parser "github.com/apache/skywalking-satellite/plugins/parser/api"
-	queue "github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue"
 	receiver "github.com/apache/skywalking-satellite/plugins/receiver/api"
 	server "github.com/apache/skywalking-satellite/plugins/server/api"
 )
diff --git a/plugins/parser/api/parser.go b/plugins/parser/api/parser.go
index 754037f..3e09e3a 100644
--- a/plugins/parser/api/parser.go
+++ b/plugins/parser/api/parser.go
@@ -27,8 +27,8 @@ type Parser interface {
 	plugin.Plugin
 
 	// ParseBytes parse the byte buffer into events.
-	ParseBytes(bytes []byte) ([]event.SerializableEvent, error)
+	ParseBytes(bytes []byte) (event.BatchEvents, error)
 
 	// ParseStr parse the string into events.
-	ParseStr(str string) ([]event.SerializableEvent, error)
+	ParseStr(str string) (event.BatchEvents, error)
 }
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
index f25b2ff..a5ad163 100644
--- a/plugins/queue/api/queue.go
+++ b/plugins/queue/api/queue.go
@@ -18,6 +18,8 @@
 package api
 
 import (
+	"reflect"
+
 	"github.com/apache/skywalking-satellite/internal/pkg/event"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 )
@@ -26,14 +28,14 @@ import (
 type Queue interface {
 	plugin.Plugin
 
-	// Prepare creates the queue.
-	Prepare() error
+	// Initialize creates the queue.
+	Initialize() error
 
 	// Push a inputEvent into the queue.
-	Push(event event.SerializableEvent) error
+	Push(event *event.Event) error
 
 	// Pop returns a SequenceEvent when Queue is not empty,
-	Pop() chan *SequenceEvent
+	Pop() (*SequenceEvent, error)
 
 	// Close would close the queue.
 	Close() error
@@ -44,6 +46,11 @@ type Queue interface {
 
 // SequenceEvent is a wrapper to pass the event and the offset.
 type SequenceEvent struct {
-	Event  event.Event
+	Event  *event.Event
 	Offset event.Offset
 }
+
+// GetQueue an initialized filter plugin.
+func GetQueue(config plugin.Config) Queue {
+	return plugin.Get(reflect.TypeOf((*Queue)(nil)).Elem(), config).(Queue)
+}
diff --git a/plugins/queue/mmap/README.md b/plugins/queue/mmap/README.md
deleted file mode 100644
index 3f03ea1..0000000
--- a/plugins/queue/mmap/README.md
+++ /dev/null
@@ -1 +0,0 @@
-# Plugin description
\ No newline at end of file
diff --git a/plugins/queue/mmap/branchmark_test.go b/plugins/queue/mmap/branchmark_test.go
new file mode 100644
index 0000000..4bf9ee4
--- /dev/null
+++ b/plugins/queue/mmap/branchmark_test.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"fmt"
+	"os"
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+type benchmarkParam struct {
+	segmentSize      int
+	message          int // unit KB
+	maxInMemSegments int
+}
+
+var params = []benchmarkParam{
+	{segmentSize: 1024 * 128, message: 8, maxInMemSegments: 10},
+	// compare the influence of the segmentSize.
+	{segmentSize: 1024 * 256, message: 8, maxInMemSegments: 10},
+	// compare the influence of the maxInMemSegments.
+	{segmentSize: 1024 * 128, message: 8, maxInMemSegments: 20},
+	// compare the influence of the message size.
+	{segmentSize: 1024 * 128, message: 16, maxInMemSegments: 10},
+}
+
+func cleanBenchmarkQueue(b *testing.B, q api.Queue) {
+	if err := os.RemoveAll(q.(*Queue).QueueDir); err != nil {
+		b.Errorf("cannot remove test queue dir, %v", err)
+	}
+}
+
+func BenchmarkPush(b *testing.B) {
+	for _, param := range params {
+		name := fmt.Sprintf("segmentSize: %dKB maxInMemSegments:%d message:%dKB", param.segmentSize/1024, param.maxInMemSegments, param.message)
+		b.Run(name, func(b *testing.B) {
+			q, err := initMmapQueue(plugin.Config{
+				"queue_dir":               "BenchmarkPush",
+				"segment_size":            param.segmentSize,
+				"max_in_mem_segments":     param.maxInMemSegments,
+				"queue_capacity_segments": 10000,
+			})
+			if err != nil {
+				b.Fatalf("cannot get a mmap queue: %v", err)
+			}
+			event := getLargeEvent(param.message)
+			b.ReportAllocs()
+			b.ResetTimer()
+			println()
+			for i := 0; i < b.N; i++ {
+				if err := q.Push(event); err != nil {
+					b.Fatalf("error in pushing: %v", err)
+				}
+			}
+			b.StopTimer()
+			_ = q.Close()
+			cleanBenchmarkQueue(b, q)
+		})
+	}
+}
+
+func BenchmarkPushAndPop(b *testing.B) {
+	for _, param := range params {
+		name := fmt.Sprintf("segmentSize: %dKB maxInMemSegments:%d message:%dKB", param.segmentSize/1024, param.maxInMemSegments, param.message)
+		b.Run(name, func(b *testing.B) {
+			q, err := initMmapQueue(plugin.Config{
+				"queue_dir":           "BenchmarkPushAndPop",
+				"segment_size":        param.segmentSize,
+				"max_in_mem_segments": param.maxInMemSegments,
+			})
+			if err != nil {
+				b.Fatalf("cannot get a mmap queue: %v", err)
+			}
+			event := getLargeEvent(param.message)
+			b.ReportAllocs()
+			b.ResetTimer()
+			println()
+			for i := 0; i < b.N; i++ {
+				if err := q.Push(event); err != nil {
+					b.Fatalf("error in pushing: %v", err)
+				}
+				if _, err := q.Pop(); err != nil {
+					b.Fatalf("error in pushing: %v", err)
+				}
+			}
+			b.StopTimer()
+			_ = q.Close()
+			cleanBenchmarkQueue(b, q)
+		})
+	}
+}
diff --git a/plugins/queue/mmap/meta/meta.go b/plugins/queue/mmap/meta/meta.go
new file mode 100644
index 0000000..cfa32a3
--- /dev/null
+++ b/plugins/queue/mmap/meta/meta.go
@@ -0,0 +1,153 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package meta
+
+import (
+	"fmt"
+	"syscall"
+
+	"path/filepath"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+const (
+	metaSize    = 80
+	metaName    = "meta.dat"
+	metaVersion = 1
+)
+
+// Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment,
+// it takes at least one memory page size, which is generally 4K.
+//
+// [    8Bit   ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit  ]
+// [metaVersion][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][capacity]
+// [metaVersion][writing   offset][watermark offset][committed offset][reading   offset][capacity]
+type Metadata struct {
+	metaFile *mmap.File
+	name     string
+	size     int
+	capacity int
+}
+
+// NewMetaData read or create a Metadata with supported metaVersion
+func NewMetaData(metaDir string, capacity int) (*Metadata, error) {
+	path := filepath.Join(metaDir, metaName)
+	metaFile, err := segment.NewSegment(path, metaSize)
+	if err != nil {
+		return nil, fmt.Errorf("error in crating the Metadata memory mapped file: %v", err)
+	}
+
+	m := &Metadata{
+		metaFile: metaFile,
+		name:     metaName,
+		size:     metaSize,
+		capacity: capacity,
+	}
+
+	v := m.GetVersion()
+	if v != 0 && v != metaVersion {
+		return nil, fmt.Errorf("metadata metaVersion is not matching, the Metadata metaVersion is %d", v)
+	}
+	c := m.GetCapacity()
+	if c != 0 && c != capacity {
+		return nil, fmt.Errorf("metadata catapacity is not equal to the old capacity, the old capacity is %d", c)
+	}
+	m.PutVersion(metaVersion)
+	m.PutCapacity(int64(capacity))
+	return m, nil
+}
+
+// GetVersion returns the meta version.
+func (m *Metadata) GetVersion() int {
+	return int(m.metaFile.ReadUint64At(0))
+}
+
+// PutVersion put the version into the memory mapped file.
+func (m *Metadata) PutVersion(version int64) {
+	m.metaFile.WriteUint64At(uint64(version), 0)
+}
+
+// GetWritingOffset returns the writing offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetWritingOffset() (segmentID, offset int64) {
+	return int64(m.metaFile.ReadUint64At(8)), int64(m.metaFile.ReadUint64At(16))
+}
+
+// PutWritingOffset put the segment ID and the offset of the segment into the writing offset.
+func (m *Metadata) PutWritingOffset(segmentID, offset int64) {
+	m.metaFile.WriteUint64At(uint64(segmentID), 8)
+	m.metaFile.WriteUint64At(uint64(offset), 16)
+}
+
+// GetWatermarkOffset returns the watermark offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetWatermarkOffset() (segmentID, offset int64) {
+	return int64(m.metaFile.ReadUint64At(24)), int64(m.metaFile.ReadUint64At(32))
+}
+
+// PutWatermarkOffset put the segment ID and the offset of the segment into the watermark offset.
+func (m *Metadata) PutWatermarkOffset(segmentID, offset int64) {
+	m.metaFile.WriteUint64At(uint64(segmentID), 24)
+	m.metaFile.WriteUint64At(uint64(offset), 32)
+}
+
+// GetCommittedOffset returns the committed offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetCommittedOffset() (segmentID, offset int64) {
+	return int64(m.metaFile.ReadUint64At(40)), int64(m.metaFile.ReadUint64At(48))
+}
+
+// PutCommittedOffset put the segment ID and the offset of the segment into the committed offset.
+func (m *Metadata) PutCommittedOffset(segmentID, offset int64) {
+	m.metaFile.WriteUint64At(uint64(segmentID), 40)
+	m.metaFile.WriteUint64At(uint64(offset), 48)
+}
+
+// GetReadingOffset returns the reading offset, which contains the segment ID and the offset of the segment.
+func (m *Metadata) GetReadingOffset() (segmentID, offset int64) {
+	return int64(m.metaFile.ReadUint64At(56)), int64(m.metaFile.ReadUint64At(64))
+}
+
+// PutReadingOffset put the segment ID and the offset of the segment into the reading offset.
+func (m *Metadata) PutReadingOffset(segmentID, offset int64) {
+	m.metaFile.WriteUint64At(uint64(segmentID), 56)
+	m.metaFile.WriteUint64At(uint64(offset), 64)
+}
+
+// GetCapacity returns the capacity of the queue.
+func (m *Metadata) GetCapacity() int {
+	return int(m.metaFile.ReadUint64At(72))
+}
+
+// PutCapacity put the capacity into the memory mapped file.
+func (m *Metadata) PutCapacity(version int64) {
+	m.metaFile.WriteUint64At(uint64(version), 72)
+}
+
+// Flush the memory mapped file to the disk.
+func (m *Metadata) Flush() error {
+	return m.metaFile.Flush(syscall.MS_SYNC)
+}
+
+// Close do Flush operation and unmap the memory mapped file.
+func (m *Metadata) Close() error {
+	if err := m.Flush(); err != nil {
+		return err
+	}
+	return m.metaFile.Unmap()
+}
diff --git a/plugins/queue/mmap/meta/meta_test.go b/plugins/queue/mmap/meta/meta_test.go
new file mode 100644
index 0000000..fcc6b57
--- /dev/null
+++ b/plugins/queue/mmap/meta/meta_test.go
@@ -0,0 +1,208 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package meta
+
+import (
+	"os"
+	"reflect"
+	"testing"
+)
+
+type args struct {
+	metaVersion int
+	capacity    int
+	// ID
+	writingSegID   int64
+	readingSegID   int64
+	committedSegID int64
+	watermarkSegID int64
+	// offset
+	writingSegOffset   int64
+	readingSegOffset   int64
+	committedSegOffset int64
+	watermarkSegOffset int64
+}
+
+type tests struct {
+	name    string
+	args    args
+	want    args
+	wantErr bool
+}
+
+func Test_newMetaData(t *testing.T) {
+	const testDir = "testMeta"
+	const preCapacity = 500
+	params := []tests{
+		buildMetaverionErrorTest(),
+		buildNormalTest(),
+		buildCapacitynErrorTest(),
+	}
+
+	for _, tt := range params {
+		t.Run(tt.name, func(t *testing.T) {
+			// clean
+			defer func() {
+				if err := os.RemoveAll(testDir); err != nil {
+					t.Errorf("remove Metadata dir error: %v", err)
+				}
+			}()
+
+			got, err := NewMetaData(testDir, preCapacity)
+			if err != nil {
+				t.Errorf("cannot create Metadata file: %v", err)
+				return
+			}
+
+			// write args
+			got.PutVersion(int64(tt.args.metaVersion))
+			got.PutWritingOffset(tt.args.writingSegID, tt.args.writingSegOffset)
+			got.PutReadingOffset(tt.args.readingSegID, tt.args.readingSegOffset)
+			got.PutCommittedOffset(tt.args.committedSegID, tt.args.committedSegOffset)
+			got.PutWatermarkOffset(tt.args.watermarkSegID, tt.args.watermarkSegOffset)
+			if got.Close() != nil {
+				t.Errorf("cannot Close the Metadata file: %v", err)
+				return
+			}
+
+			oldMeta, err := NewMetaData(testDir, tt.args.capacity)
+			if err != nil {
+				if tt.wantErr {
+					return
+				}
+				t.Errorf("cannot read old Metadata file: %v", err)
+				return
+			}
+
+			// read args
+			wmID, wmOffset := oldMeta.GetWatermarkOffset()
+			cID, cOffset := oldMeta.GetCommittedOffset()
+			rID, rOffset := oldMeta.GetReadingOffset()
+			wID, wOffset := oldMeta.GetWritingOffset()
+
+			readArgs := args{
+				metaVersion:        oldMeta.GetVersion(),
+				capacity:           oldMeta.GetCapacity(),
+				watermarkSegID:     wmID,
+				writingSegID:       wID,
+				readingSegID:       rID,
+				committedSegID:     cID,
+				watermarkSegOffset: wmOffset,
+				readingSegOffset:   rOffset,
+				committedSegOffset: cOffset,
+				writingSegOffset:   wOffset,
+			}
+			if !reflect.DeepEqual(readArgs, tt.want) {
+				t.Errorf("want meta info is [%+v]\n ,but got [%+v]", tt.want, readArgs)
+			}
+		})
+	}
+}
+
+func buildMetaverionErrorTest() tests {
+	return tests{
+		name: "wrong version",
+		args: args{
+			metaVersion:        2,
+			capacity:           500,
+			watermarkSegID:     1,
+			writingSegID:       2,
+			readingSegID:       3,
+			committedSegID:     4,
+			watermarkSegOffset: 10,
+			readingSegOffset:   20,
+			committedSegOffset: 30,
+			writingSegOffset:   40,
+		},
+		want: args{
+			metaVersion:        2,
+			capacity:           500,
+			watermarkSegID:     1,
+			writingSegID:       2,
+			readingSegID:       3,
+			committedSegID:     4,
+			watermarkSegOffset: 10,
+			readingSegOffset:   20,
+			committedSegOffset: 30,
+			writingSegOffset:   40,
+		},
+		wantErr: true,
+	}
+}
+
+func buildCapacitynErrorTest() tests {
+	return tests{
+		name: "wrong version",
+		args: args{
+			metaVersion:        1,
+			capacity:           600,
+			watermarkSegID:     1,
+			writingSegID:       2,
+			readingSegID:       3,
+			committedSegID:     4,
+			watermarkSegOffset: 10,
+			readingSegOffset:   20,
+			committedSegOffset: 30,
+			writingSegOffset:   40,
+		},
+		want: args{
+			metaVersion:        1,
+			capacity:           600,
+			watermarkSegID:     1,
+			writingSegID:       2,
+			readingSegID:       3,
+			committedSegID:     4,
+			watermarkSegOffset: 10,
+			readingSegOffset:   20,
+			committedSegOffset: 30,
+			writingSegOffset:   40,
+		},
+		wantErr: true,
+	}
+}
+
+func buildNormalTest() tests {
+	return tests{
+		name: "correct version",
+		args: args{
+			metaVersion:        1,
+			capacity:           500,
+			watermarkSegID:     2,
+			writingSegID:       3,
+			readingSegID:       4,
+			committedSegID:     5,
+			watermarkSegOffset: 6,
+			readingSegOffset:   7,
+			committedSegOffset: 8,
+			writingSegOffset:   9,
+		},
+		want: args{
+			metaVersion:        1,
+			capacity:           500,
+			watermarkSegID:     2,
+			writingSegID:       3,
+			readingSegID:       4,
+			committedSegID:     5,
+			watermarkSegOffset: 6,
+			readingSegOffset:   7,
+			committedSegOffset: 8,
+			writingSegOffset:   9,
+		},
+		wantErr: true,
+	}
+}
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
new file mode 100644
index 0000000..e699420
--- /dev/null
+++ b/plugins/queue/mmap/queue.go
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"os"
+	"sync"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/meta"
+)
+
+// Queue is a memory mapped queue to store the input data.
+type Queue struct {
+	sync.Mutex
+	// config
+	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int    `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
+	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+
+	// running components
+	meta                   *meta.Metadata // The metadata file.
+	segments               []*mmap.File   // The data files.
+	mmapCount              int            // The number of the memory mapped files.
+	unflushedNum           int            // The unflushed number.
+	flushChannel           chan struct{}  // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
+	insufficientMemChannel chan struct{}  // Notify when memory is insufficient
+	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
+
+	// control components
+	ctx        context.Context    // Parent ctx
+	cancel     context.CancelFunc // Parent ctx cancel function
+	showDownWg sync.WaitGroup     // The shutdown wait group.
+
+	bufPool *sync.Pool
+
+	encoder *Encoder
+	decoder *Decoder
+}
+
+func (q *Queue) Name() string {
+	return "mmap-queue"
+}
+
+func (q *Queue) Description() string {
+	return "this is a mmap queue"
+}
+
+func (q *Queue) DefaultConfig() string {
+	return `
+# The size of each segment. Default value is 128K. The unit is Byte.
+segment_size: 131072
+# The max num of segments in memory. Default value is 10.
+max_in_mem_segments: 10
+# The capacity of Queue = segment_size * queue_capacity_segments.
+queue_capacity_segments: 4000
+# The period flush time. The unit is ms. Default value is 1 second.
+flush_period: 1000
+# The max number in one flush time.  Default value is 10000.
+flush_ceiling_num: 10000
+# Contains all files in the queue.
+queue_dir: satellite-mmap-queue
+# The max size of the input event. Default value is 20k.
+max_event_size: 20480
+`
+}
+
+func (q *Queue) Initialize() error {
+	q.encoder = NewEncoder()
+	q.decoder = NewDecoder()
+
+	q.bufPool = &sync.Pool{New: func() interface{} {
+		return new(bytes.Buffer)
+	}}
+	// the size of each segment file should be a multiple of the page size.
+	pageSize := os.Getpagesize()
+	if q.SegmentSize%pageSize != 0 {
+		q.SegmentSize -= q.SegmentSize % pageSize
+	}
+	if q.SegmentSize/pageSize == 0 {
+		q.SegmentSize = 131072
+	}
+	// the minimum MaxInMemSegments value should be 4.
+	if q.MaxInMemSegments < 4 {
+		q.MaxInMemSegments = 4
+	}
+	// load metadata and override the reading or writing offset by the committed or watermark offset.
+	md, err := meta.NewMetaData(q.QueueDir, q.QueueCapacitySegments)
+	if err != nil {
+		return fmt.Errorf("error in creating the metadata: %v", err)
+	}
+	q.meta = md
+	cmID, cmOffset := md.GetCommittedOffset()
+	wmID, wmOffset := md.GetWatermarkOffset()
+	md.PutWritingOffset(wmID, wmOffset)
+	md.PutReadingOffset(cmID, cmOffset)
+	// keep the reading or writing segments in the memory.
+	q.segments = make([]*mmap.File, q.QueueCapacitySegments)
+	if _, err := q.GetSegment(cmID); err != nil {
+		return err
+	}
+	if _, err := q.GetSegment(wmID); err != nil {
+		return err
+	}
+	// init components
+	q.insufficientMemChannel = make(chan struct{})
+	q.sufficientMemChannel = make(chan struct{})
+	q.flushChannel = make(chan struct{})
+	q.ctx, q.cancel = context.WithCancel(context.Background())
+	// async supported processes.
+	q.showDownWg.Add(2)
+	go q.segmentSwapper()
+	go q.flush()
+	return nil
+}
+
+func (q *Queue) Push(e *event.Event) error {
+	data, err := q.encoder.serialize(e)
+	if err != nil {
+		return err
+	}
+	if len(data) > q.MaxEventSize {
+		return fmt.Errorf("cannot push the event to the queue because the size %dB is over ceiling", len(data))
+	}
+	return q.push(data)
+}
+
+func (q *Queue) Pop() (*api.SequenceEvent, error) {
+	data, id, offset, err := q.pop()
+	if err != nil {
+		return nil, err
+	}
+	e, err := q.decoder.deserialize(data)
+	if err != nil {
+		return nil, err
+	}
+	return &api.SequenceEvent{
+		Event:  e,
+		Offset: q.encodeOffset(id, offset),
+	}, nil
+}
+
+func (q *Queue) Close() error {
+	q.cancel()
+	q.showDownWg.Wait()
+	for i, segment := range q.segments {
+		if segment != nil {
+			err := segment.Unmap()
+			if err != nil {
+				log.Logger.Errorf("cannot unmap the segments: %d, %v", i, err)
+			}
+		}
+	}
+	if err := q.meta.Close(); err != nil {
+		log.Logger.Errorf("cannot unmap the metadata: %v", err)
+	}
+	return nil
+}
+
+func (q *Queue) Ack(lastOffset event.Offset) {
+	id, offset, err := q.decodeOffset(lastOffset)
+	if err != nil {
+		log.Logger.Errorf("cannot ack queue with the offset:%s", lastOffset)
+	}
+	q.meta.PutCommittedOffset(id, offset)
+}
diff --git a/plugins/queue/mmap/queue_opreation.go b/plugins/queue/mmap/queue_opreation.go
new file mode 100644
index 0000000..b73321e
--- /dev/null
+++ b/plugins/queue/mmap/queue_opreation.go
@@ -0,0 +1,236 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+const uInt64Size = 8
+
+// flush control the flush operation by timer or counter.
+func (q *Queue) flush() {
+	defer q.showDownWg.Done()
+	ctx, cancel := context.WithCancel(q.ctx)
+	defer cancel()
+	for {
+		timeTicker := time.NewTicker(time.Duration(q.FlushPeriod) * time.Millisecond)
+		select {
+		case <-q.flushChannel:
+			q.doFlush()
+		case <-timeTicker.C:
+			q.doFlush()
+		case <-ctx.Done():
+			q.doFlush()
+			return
+		}
+	}
+}
+
+// doFlush flush the segment and meta files to the disk.
+func (q *Queue) doFlush() {
+	q.Lock()
+	defer q.Unlock()
+	for _, segment := range q.segments {
+		if segment == nil {
+			continue
+		}
+		if err := segment.Flush(syscall.MS_SYNC); err != nil {
+			log.Logger.Errorf("cannot flush segment file: %v", err)
+		}
+	}
+	wid, woffset := q.meta.GetWritingOffset()
+	q.meta.PutWatermarkOffset(wid, woffset)
+	if err := q.meta.Flush(); err != nil {
+		log.Logger.Errorf("cannot flush meta file: %v", err)
+	}
+}
+
+// push writes the data into the file system. It first writes the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) push(bytes []byte) error {
+	if q.isFull() {
+		return fmt.Errorf("cannot push data when the queue is full")
+	}
+	id, offset := q.meta.GetWritingOffset()
+	id, offset, err := q.writeLength(len(bytes), id, offset)
+	if err != nil {
+		return err
+	}
+	id, offset, err = q.writeBytes(bytes, id, offset)
+	if err != nil {
+		return err
+	}
+	q.meta.PutWritingOffset(id, offset)
+	q.unflushedNum++
+	if q.unflushedNum == q.FlushCeilingNum {
+		q.flushChannel <- struct{}{}
+		q.unflushedNum = 0
+	}
+	return nil
+}
+
+// pop reads the data from the file system. It first reads the length of the data,
+// then the data itself. It means the whole data may not exist in the one segments.
+func (q *Queue) pop() (data []byte, rid, roffset int64, err error) {
+	if q.isEmpty() {
+		return nil, 0, 0, fmt.Errorf("cannot read data when the queue is empty")
+	}
+	id, offset := q.meta.GetReadingOffset()
+	id, offset, length, err := q.readLength(id, offset)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	bytes, id, offset, err := q.readBytes(id, offset, length)
+	if err != nil {
+		return nil, 0, 0, err
+	}
+	q.meta.PutReadingOffset(id, offset)
+	return bytes, id, offset, nil
+}
+
+// readBytes reads bytes into the memory mapped file.
+func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, newOffset int64, err error) {
+	counter := 0
+	res := make([]byte, length)
+	for {
+		segment, err := q.GetSegment(id)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		readBytes, err := segment.ReadAt(res[counter:], offset)
+		if err != nil {
+			return nil, 0, 0, err
+		}
+		counter += readBytes
+		offset += int64(readBytes)
+		if offset == int64(q.SegmentSize) {
+			id, offset = id+1, 0
+		}
+		if counter == length {
+			break
+		}
+	}
+	return res, id, offset, nil
+}
+
+// readLength reads the data length with 8 Bits spaces.
+func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int, err error) {
+	if offset+uInt64Size > int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	segment, err := q.GetSegment(id)
+	if err != nil {
+		return 0, 0, 0, err
+	}
+	num := segment.ReadUint64At(offset)
+	offset += uInt64Size
+	if offset == int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	return id, offset, int(num), nil
+}
+
+// writeLength write the data length with 8 Bits spaces.
+func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int64, err error) {
+	if offset+uInt64Size > int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	segment, err := q.GetSegment(id)
+	if err != nil {
+		return 0, 0, err
+	}
+	segment.WriteUint64At(uint64(length), offset)
+	offset += uInt64Size
+	if offset == int64(q.SegmentSize) {
+		id, offset = id+1, 0
+	}
+	return id, offset, nil
+}
+
+// writeBytes writes bytes into the memory mapped file.
+func (q *Queue) writeBytes(bytes []byte, id, offset int64) (newID, newOffset int64, err error) {
+	counter := 0
+	length := len(bytes)
+
+	for {
+		segment, err := q.GetSegment(id)
+		if err != nil {
+			return 0, 0, err
+		}
+		writtenBytes, err := segment.WriteAt(bytes[counter:], offset)
+		if err != nil {
+			return 0, 0, err
+		}
+		counter += writtenBytes
+		offset += int64(writtenBytes)
+		if offset == int64(q.SegmentSize) {
+			id, offset = id+1, 0
+		}
+		if counter == length {
+			break
+		}
+	}
+	return id, offset, nil
+}
+
+// isEmpty returns the capacity status
+func (q *Queue) isEmpty() bool {
+	rid, roffset := q.meta.GetReadingOffset()
+	wid, woffset := q.meta.GetWritingOffset()
+	return rid == wid && roffset == woffset
+}
+
+// isEmpty returns the capacity status
+func (q *Queue) isFull() bool {
+	rid, _ := q.meta.GetReadingOffset()
+	wid, _ := q.meta.GetWritingOffset()
+	// ensure enough spaces to promise data stability.
+	maxWid := rid + int64(q.QueueCapacitySegments) - 1 - int64(q.MaxEventSize/q.SegmentSize)
+	return wid >= maxWid
+}
+
+// encode the meta to the offset
+func (q *Queue) encodeOffset(id, offset int64) event.Offset {
+	return event.Offset(strconv.FormatInt(id, 10) + "-" + strconv.FormatInt(offset, 10))
+}
+
+// decode the offset to the meta of the mmap queue.
+func (q *Queue) decodeOffset(val event.Offset) (id, offset int64, err error) {
+	arr := strings.Split(string(val), "-")
+	if len(arr) == 2 {
+		id, err := strconv.ParseInt(arr[0], 10, 64)
+		if err != nil {
+			return 0, 0, err
+		}
+		offset, err := strconv.ParseInt(arr[1], 10, 64)
+		if err != nil {
+			return 0, 0, err
+		}
+		return id, offset, nil
+	}
+	return 0, 0, fmt.Errorf("the input offset string is illegal: %s", val)
+}
diff --git a/plugins/queue/mmap/queue_test.go b/plugins/queue/mmap/queue_test.go
new file mode 100644
index 0000000..915400e
--- /dev/null
+++ b/plugins/queue/mmap/queue_test.go
@@ -0,0 +1,402 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+	"fmt"
+	"os"
+	"reflect"
+	"strconv"
+	"testing"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+)
+
+func initMmapQueue(cfg plugin.Config) (*Queue, error) {
+	log.Init(&log.LoggerConfig{})
+	plugin.RegisterPluginCategory(reflect.TypeOf((*api.Queue)(nil)).Elem())
+	plugin.RegisterPlugin(&Queue{})
+	var config plugin.Config = map[string]interface{}{
+		plugin.NameField: "mmap-queue",
+	}
+	for k, v := range cfg {
+		config[k] = v
+	}
+	q := api.GetQueue(config)
+	if q == nil {
+		return nil, fmt.Errorf("cannot get a default config mmap queue from the registry")
+	}
+	if q.Initialize() != nil {
+		return nil, fmt.Errorf("queue cannot initialize")
+	}
+	return q.(*Queue), nil
+}
+
+func cleanTestQueue(t *testing.T, q api.Queue) {
+	if err := os.RemoveAll(q.(*Queue).QueueDir); err != nil {
+		t.Errorf("cannot remove test queue dir, %v", err)
+	}
+}
+
+func getBatchEvents(count int) []*event.Event {
+	var slice []*event.Event
+	msg := make([]byte, 0)
+	for i := 0; i < 2000; i++ {
+		m := []byte("a")
+		msg = append(msg, m...)
+	}
+	for i := 0; i < count; i++ {
+		slice = append(slice, &event.Event{
+			Name:      "event" + strconv.Itoa(i),
+			Timestamp: time.Now(),
+			Meta: map[string]string{
+				"meta": "mval" + strconv.Itoa(i),
+			},
+			Data: map[string]interface{}{
+				"data":  "dval" + strconv.Itoa(i),
+				"bytes": msg,
+				"index": i,
+			},
+			Type:   event.LogEvent,
+			Remote: true,
+		})
+	}
+	return slice
+}
+
+func getNKData(n int) []byte {
+	return bytes.Repeat([]byte("a"), 1024*n)
+}
+
+func getLargeEvent(n int) *event.Event {
+	return &event.Event{
+		Name:      "largeEvent",
+		Timestamp: time.Now(),
+		Meta: map[string]string{
+			"meta": "largeEvent",
+		},
+		Data: map[string]interface{}{
+			"data": getNKData(n),
+		},
+		Type:   event.LogEvent,
+		Remote: true,
+	}
+}
+
+func TestQueue_Normal(t *testing.T) {
+	q, err := initMmapQueue(plugin.Config{
+		"queue_dir": "TestQueue_Normal",
+	})
+	defer cleanTestQueue(t, q)
+	if err != nil {
+		t.Fatalf("error in initializing the mmap queue: %v", err)
+	}
+	events := getBatchEvents(10)
+	for _, e := range events {
+		if err = q.Push(e); err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+	}
+	for i := 0; i < 10; i++ {
+		sequenceEvent, err := q.Pop()
+		if err != nil {
+			t.Errorf("error in fetching data from queue: %v", err)
+		} else if !cmp.Equal(events[i], sequenceEvent.Event) {
+			t.Errorf("history data and fetching data is not equal\n,history:%+v\n. pop data:%+v\n", events[i], sequenceEvent.Event)
+		}
+	}
+}
+
+func TestQueue_ReadHistory(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":    "TestQueue_ReadHistory",
+		"segment_size": 10240,
+	}
+
+	q, err := initMmapQueue(cfg)
+	defer cleanTestQueue(t, q)
+	if err != nil {
+		t.Fatalf("error in initializing the mmap queue: %v", err)
+	}
+	// close the queue to create a history empty queue.
+	if err := q.Close(); err != nil {
+		t.Fatalf("error in closing queue, %v", err)
+	}
+
+	// test cases.
+	batchSize := 10
+	batchNum := 100
+	events := getBatchEvents(batchSize * batchNum)
+
+	// Insert batchNum pieces of data in batchNum times
+	for i := 0; i < batchSize; i++ {
+		// recreate the queue
+		q, err := initMmapQueue(cfg)
+		if err != nil {
+			t.Fatalf("error in initializing the mmap queue: %v", err)
+		}
+		for j := 0; j < batchNum; j++ {
+			index := i*batchSize + j
+			if err = q.Push(events[index]); err != nil {
+				t.Errorf("queue cannot push one event: %+v", err)
+			}
+		}
+		if err := q.Close(); err != nil {
+			t.Fatalf("error in closing queue, %v", err)
+		}
+	}
+
+	// Read batchNum pieces of data in batchNum times
+	for i := 0; i < batchSize; i++ {
+		// recreate the queue
+		q, err := initMmapQueue(cfg)
+		if err != nil {
+			t.Fatalf("error in initializing the mmap queue: %v", err)
+		}
+		for j := 0; j < batchNum; j++ {
+			index := i*batchSize + j
+			sequenceEvent, err := q.Pop()
+			if err != nil {
+				t.Errorf("error in fetching data from queue: %v", err)
+			} else if cmp.Equal(events[index], sequenceEvent.Event) {
+				q.Ack(sequenceEvent.Offset)
+			} else {
+				t.Errorf("history data and fetching data is not equal\n,history:%+v\n. pop data:%+v\n", events[index], sequenceEvent.Event)
+			}
+		}
+		if err := q.Close(); err != nil {
+			t.Fatalf("error in closing queue, %v", err)
+		}
+	}
+}
+
+func TestQueue_PushOverCeilingMsg(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":      "TestQueue_PushOverCeilingMsg",
+		"segment_size":   10240,
+		"max_event_size": 1024 * 8,
+	}
+	largeEvent := getLargeEvent(20)
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	err = q.Push(largeEvent)
+	if err == nil {
+		t.Fatalf("The insertion of the over ceiling event is not as expected")
+	} else {
+		fmt.Printf("want err: %v", err)
+	}
+}
+
+func TestQueue_FlushWhenReachNum(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":         "TestQueue_FlushWhenReachNum",
+		"segment_size":      10240,
+		"flush_ceiling_num": 5,
+		"flush_period":      1000 * 60,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	events := getBatchEvents(5)
+
+	for _, e := range events {
+		err = q.Push(e)
+		if err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+	}
+	time.Sleep(time.Second)
+	wID, wOffset := q.meta.GetWritingOffset()
+	wmID, wmOffset := q.meta.GetWatermarkOffset()
+	if wID != wmID || wOffset != wmOffset {
+		t.Fatalf("the flush operation was not invoking when reach the flush_ceiling_num.")
+	}
+}
+
+func TestQueue_FlushPeriod(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":         "TestQueue_FlushPeriod",
+		"segment_size":      10240,
+		"flush_ceiling_num": 50,
+		"flush_period":      1000 * 1,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	events := getBatchEvents(5)
+
+	for _, e := range events {
+		err = q.Push(e)
+		if err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+	}
+	time.Sleep(time.Second * 2)
+	wID, wOffset := q.meta.GetWritingOffset()
+	wmID, wmOffset := q.meta.GetWatermarkOffset()
+	if wID != wmID || wOffset != wmOffset {
+		t.Fatalf("the flush operation was not invoking when reach the flush_ceiling_num.")
+	}
+}
+
+func TestQueue_MemCost(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":           "TestQueue_MemCost",
+		"segment_size":        1024 * 4,
+		"max_in_mem_segments": 8,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	events := getBatchEvents(20)
+	var memcost []int
+	for _, e := range events {
+		err = q.Push(e)
+		memcost = append(memcost, q.mmapCount)
+		if err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+	}
+	want := []int{
+		1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 5, 6, 6, 7, 7, 8,
+	}
+	if !cmp.Equal(want, memcost) {
+		t.Fatalf("the memory cost trends are not in line with expectations,\n want: %v,\n but got: %v", want, memcost)
+	}
+}
+
+func TestQueue_OverSegmentEvent(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":    "TestQueue_OverSegmentEvent",
+		"segment_size": 1024 * 4,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	size := 10
+	wantPos := size * 1024 / q.SegmentSize
+	largeEvent := getLargeEvent(size)
+	err = q.Push(largeEvent)
+	if err != nil {
+		t.Errorf("queue cannot push one event: %+v", err)
+	}
+	id, _ := q.meta.GetWritingOffset()
+	if int(id) != wantPos {
+		t.Fatalf("the writing offset id should at %d segment, but at %d", id, wantPos)
+	}
+}
+
+func TestQueue_ReusingFiles(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":               "TestQueue_ReusingFiles",
+		"segment_size":            1024 * 4,
+		"queue_capacity_segments": 5,
+		"max_event_size":          1024 * 3,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+
+	for i := 0; i < 100; i++ {
+		err = q.Push(getLargeEvent(2))
+		if err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+		_, err := q.Pop()
+		if err != nil {
+			t.Errorf("error in fetching data from queue: %v", err)
+		}
+	}
+	rid, roffset := q.meta.GetReadingOffset()
+	wid, woffset := q.meta.GetWritingOffset()
+	fmt.Printf("rid:%d, roffset:%d, wid:%d, woffset:%d\n", rid, roffset, wid, woffset)
+	if int(wid) <= q.QueueCapacitySegments || int(rid) <= q.QueueCapacitySegments {
+		t.Errorf("cannot valid reusing files")
+	}
+}
+
+func TestQueue_Empty(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":               "TestQueue_ReusingFiles",
+		"segment_size":            1024 * 4,
+		"queue_capacity_segments": 10,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	for _, e := range getBatchEvents(3) {
+		err = q.Push(e)
+		if err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+	}
+	for i := 0; i < 3; i++ {
+		if _, err = q.Pop(); err != nil {
+			t.Errorf("error in fetching data from queue: %v", err)
+		}
+	}
+	_, err = q.Pop()
+	if err != nil && err.Error() != "cannot read data when the queue is empty" {
+		t.Fatalf("not except err: %v", err)
+	}
+}
+
+func TestQueue_Full(t *testing.T) {
+	cfg := plugin.Config{
+		"queue_dir":               "TestQueue_ReusingFiles",
+		"segment_size":            1024 * 4,
+		"queue_capacity_segments": 10,
+	}
+	q, err := initMmapQueue(cfg)
+	if err != nil {
+		t.Fatalf("cannot get a mmap queue: %v", err)
+	}
+	defer cleanTestQueue(t, q)
+	for _, e := range getBatchEvents(8) {
+		err = q.Push(e)
+		if err != nil {
+			t.Errorf("queue cannot push one event: %+v", err)
+		}
+	}
+	err = q.Push(getLargeEvent(2))
+	if err != nil && err.Error() != "cannot push data when the queue is full" {
+		t.Fatalf("not except err: %v", err)
+	}
+}
diff --git a/plugins/queue/mmap/segment/segment.go b/plugins/queue/mmap/segment/segment.go
new file mode 100644
index 0000000..cf738d1
--- /dev/null
+++ b/plugins/queue/mmap/segment/segment.go
@@ -0,0 +1,68 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package segment
+
+import (
+	"fmt"
+	"os"
+	"path/filepath"
+	"syscall"
+
+	"github.com/grandecola/mmap"
+)
+
+const FileSuffix = "_segment.dat"
+
+// NewSegment returns a pointer to a memory mapped file according to the given file name and file size.
+// The size of each segment file should be a multiple of the page size.
+func NewSegment(name string, size int) (*mmap.File, error) {
+	name, err := filepath.Abs(name)
+	if err != nil {
+		return nil, fmt.Errorf("error in getting the absolute path of the segment file : %v", err)
+	}
+	paths, _ := filepath.Split(name)
+	_, err = os.Stat(paths)
+	if err != nil && os.IsNotExist(err) && os.MkdirAll(paths, 0744) != nil {
+		return nil, fmt.Errorf("error in creating the parent dirs of the segment file : %v", err)
+	}
+
+	file, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0744)
+	if err != nil {
+		return nil, fmt.Errorf("error in opening segment file : %v", err)
+	}
+
+	stat, err := file.Stat()
+	if err != nil {
+		return nil, fmt.Errorf("error in reading info of the segment file : %v", err)
+	}
+
+	if stat.Size() != int64(size) {
+		if file.Truncate(int64(size)) != nil {
+			return nil, fmt.Errorf("error in truncating file: %v", err)
+		}
+	}
+
+	segment, err := mmap.NewSharedFileMmap(file, 0, size, syscall.PROT_READ|syscall.PROT_WRITE)
+	if err != nil {
+		return nil, fmt.Errorf("error in creating the mmap segment file : %v", err)
+	}
+	if err := file.Close(); err != nil {
+		return nil, fmt.Errorf("error in closing the segment file : %v", err)
+	}
+	return segment, nil
+}
diff --git a/plugins/queue/mmap/segment/segment_test.go b/plugins/queue/mmap/segment/segment_test.go
new file mode 100644
index 0000000..98c38e6
--- /dev/null
+++ b/plugins/queue/mmap/segment/segment_test.go
@@ -0,0 +1,187 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package segment
+
+import (
+	"os"
+	"testing"
+
+	"github.com/grandecola/mmap"
+)
+
+func Test_newSegmentWithOldFile(t *testing.T) {
+	type args struct {
+		fileName     string
+		originalSize int
+		neededSize   int
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    int64
+		wantErr bool
+	}{
+		{
+			name: "less than the needed size",
+			args: args{
+				fileName:     "temp.segment",
+				originalSize: os.Getpagesize(),
+				neededSize:   os.Getpagesize() * 2,
+			},
+			want:    int64(os.Getpagesize() * 2),
+			wantErr: false,
+		},
+		{
+			name: "equal to the needed size",
+			args: args{
+				fileName:     "temp2.segment",
+				originalSize: os.Getpagesize() * 2,
+				neededSize:   os.Getpagesize() * 2,
+			},
+			want:    int64(os.Getpagesize() * 2),
+			wantErr: false,
+		},
+		{
+			name: "larger than the needed size",
+			args: args{
+				fileName:     "temp3.segment",
+				originalSize: os.Getpagesize() * 3,
+				neededSize:   os.Getpagesize() * 2,
+			},
+			want:    int64(os.Getpagesize() * 2),
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if file, err := os.Create(tt.args.fileName); err != nil {
+				t.Errorf("cannot create the original file: %v", err)
+				return
+			} else if err := file.Truncate(int64(tt.args.originalSize)); err != nil {
+				t.Errorf("cannot set the original file size: %v", err)
+			}
+
+			got, err := NewSegment(tt.args.fileName, tt.args.neededSize)
+			defer clean(got, tt.args.fileName, t)
+
+			if (err != nil) != tt.wantErr {
+				t.Errorf("NewSegment() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if file, err := os.Open(tt.args.fileName); err != nil {
+				t.Errorf("cannot open the mmap file: %v", err)
+			} else if stat, err := file.Stat(); err != nil {
+				t.Errorf("cannot read mmap file info: %v", err)
+			} else if stat.Size() != tt.want {
+				t.Errorf("want file size is %d ,but got %d", tt.want, stat.Size())
+			}
+		})
+	}
+}
+
+func Test_newSegmentSize(t *testing.T) {
+	type args struct {
+		fileName string
+		size     int
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    int64
+		wantErr bool
+	}{
+		{
+			name: "equal to page size",
+			args: args{
+				fileName: "temp2.segment",
+				size:     os.Getpagesize() * 2,
+			},
+			want:    int64(os.Getpagesize() * 2),
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := NewSegment(tt.args.fileName, tt.args.size)
+			defer clean(got, tt.args.fileName, t)
+
+			if (err != nil) != tt.wantErr {
+				t.Errorf("NewSegment() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if file, err := os.Open(tt.args.fileName); err != nil {
+				t.Errorf("cannot open the mmap file: %v", err)
+			} else if stat, err := file.Stat(); err != nil {
+				t.Errorf("cannot read mmap file info: %v", err)
+			} else if stat.Size() != tt.want {
+				t.Errorf("want file size is %d ,but got %d", tt.want, stat.Size())
+			}
+		})
+	}
+}
+
+func Test_newSegmentMultiDir(t *testing.T) {
+	const testDir = "testQueue"
+	defer func() {
+		if err := os.RemoveAll(testDir); err != nil {
+			t.Errorf("cannot clean the testqueue dir: %v", err)
+			return
+		}
+	}()
+
+	type args struct {
+		fileName string
+		size     int
+	}
+	tests := []struct {
+		name    string
+		args    args
+		wantErr bool
+	}{
+		{
+			name: "test multi dir",
+			args: args{
+				fileName: testDir + "/temp.segment",
+				size:     10,
+			},
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := NewSegment(tt.args.fileName, tt.args.size)
+			defer clean(got, tt.args.fileName, t)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("NewSegment() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+		})
+	}
+}
+
+func clean(file *mmap.File, fileName string, t *testing.T) {
+	if file == nil {
+		return
+	}
+	if err := file.Unmap(); err != nil {
+		t.Errorf("unmap segment file error: %v", err)
+	}
+	if err := os.Remove(fileName); err != nil {
+		t.Errorf("delete segment file error: %v", err)
+	}
+}
diff --git a/plugins/queue/mmap/segment_operation.go b/plugins/queue/mmap/segment_operation.go
new file mode 100644
index 0000000..9aa24d9
--- /dev/null
+++ b/plugins/queue/mmap/segment_operation.go
@@ -0,0 +1,142 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"context"
+	"fmt"
+	"path"
+	"strconv"
+
+	"github.com/grandecola/mmap"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap/segment"
+)
+
+// GetSegment returns a memory mapped file at the segmentID position.
+func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
+	index := q.GetIndex(segmentID)
+	if q.mmapCount >= q.MaxInMemSegments {
+		q.insufficientMemChannel <- struct{}{}
+		<-q.sufficientMemChannel
+	}
+	if err := q.mapSegment(segmentID); err != nil {
+		return nil, err
+	}
+	if q.segments[index] != nil {
+		return q.segments[index], nil
+	}
+	return nil, fmt.Errorf("cannot get a memory mapped file at %d segment", segmentID)
+}
+
+// mapSegment load the segment file reference to the segments.
+func (q *Queue) mapSegment(segmentID int64) error {
+	index := q.GetIndex(segmentID)
+	if q.segments[index] != nil {
+		return nil
+	}
+	filePath := path.Join(q.QueueDir, strconv.Itoa(index)+segment.FileSuffix)
+	file, err := segment.NewSegment(filePath, q.SegmentSize)
+	if err != nil {
+		return err
+	}
+	q.mmapCount++
+	q.segments[index] = file
+	return nil
+}
+
+// unmapSegment cancel the memory mapped status.
+func (q *Queue) unmapSegment(segmentID int64) error {
+	index := q.GetIndex(segmentID)
+	if q.segments[index] == nil {
+		return nil
+	}
+	if err := q.segments[index].Unmap(); err != nil {
+		return fmt.Errorf("error in unmap segemnt: %v", err)
+	}
+	q.mmapCount--
+	q.segments[index] = nil
+	return nil
+}
+
+// segmentSwapper run with a go routine to ensure the memory cost.
+func (q *Queue) segmentSwapper() {
+	defer q.showDownWg.Done()
+	ctx, cancel := context.WithCancel(q.ctx)
+	defer cancel()
+	for {
+		select {
+		case <-q.insufficientMemChannel:
+			if q.mmapCount >= q.MaxInMemSegments {
+				if q.doSwap() != nil {
+					log.Logger.Errorf("cannot get enough memory to receive new data")
+				}
+			}
+			q.sufficientMemChannel <- struct{}{}
+		case <-ctx.Done():
+			return
+		}
+	}
+}
+
+// doSwap swap the memory mapped files to normal files to promise the memory resources cost.
+func (q *Queue) doSwap() error {
+	rID, _ := q.meta.GetReadingOffset()
+	wID, _ := q.meta.GetWritingOffset()
+	logicRID := rID + int64(q.QueueCapacitySegments)
+	logicWID := wID + int64(q.QueueCapacitySegments)
+	wIndex := q.GetIndex(wID)
+	rIndex := q.GetIndex(rID)
+	for q.mmapCount >= q.MaxInMemSegments {
+		for i := logicRID - int64(q.MaxInMemSegments); i >= 0 && i < logicRID; i++ {
+			if q.GetIndex(i) == wIndex || q.GetIndex(i) == rIndex {
+				continue
+			}
+
+			if err := q.unmapSegment(i); err != nil {
+				return err
+			}
+			// the writing segment and the reading segment should still in memory.
+			// q.MaxInMemSegments/2-1 means keeping half available spaces to receive new data.
+			if q.MaxInMemSegments-q.mmapCount >= q.MaxInMemSegments/2-1 {
+				return nil
+			}
+		}
+		for i := logicWID - 1; i >= 0 && i >= logicWID-int64(q.MaxInMemSegments); i-- {
+			if q.GetIndex(i) == wIndex || q.GetIndex(i) == rIndex {
+				continue
+			}
+
+			if err := q.unmapSegment(i); err != nil {
+				return err
+			}
+			// the writing segment and the reading segment should still in memory.
+			// q.MaxInMemSegments/2-1 means keeping half available spaces to receive new data.
+			if q.MaxInMemSegments-q.mmapCount >= q.MaxInMemSegments/2-1 {
+				return nil
+			}
+		}
+	}
+	return nil
+}
+
+// GetIndex returns the index of the segments.
+func (q *Queue) GetIndex(segmentID int64) int {
+	return int(segmentID) % q.QueueCapacitySegments
+}
diff --git a/plugins/queue/mmap/serializer.go b/plugins/queue/mmap/serializer.go
new file mode 100644
index 0000000..38a5dd1
--- /dev/null
+++ b/plugins/queue/mmap/serializer.go
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import (
+	"bytes"
+
+	"encoding/gob"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/event"
+)
+
+// Decoder used in pop operation for reusing gob.Decoder and buf.
+type Decoder struct {
+	buf     *bytes.Buffer
+	decoder *gob.Decoder
+}
+
+// Encoder used in push operation for reusing gob.Decoder and buf.
+type Encoder struct {
+	buf     *bytes.Buffer
+	encoder *gob.Encoder
+}
+
+func NewDecoder() *Decoder {
+	buf := new(bytes.Buffer)
+	return &Decoder{
+		buf:     buf,
+		decoder: gob.NewDecoder(buf),
+	}
+}
+
+func NewEncoder() *Encoder {
+	buf := new(bytes.Buffer)
+	return &Encoder{
+		buf:     buf,
+		encoder: gob.NewEncoder(buf),
+	}
+}
+
+func (d *Decoder) deserialize(b []byte) (*event.Event, error) {
+	defer d.buf.Reset()
+	d.buf.Write(b)
+	e := &event.Event{}
+	err := d.decoder.Decode(e)
+	if err != nil {
+		return nil, err
+	}
+	return e, nil
+}
+
+func (e *Encoder) serialize(data *event.Event) ([]byte, error) {
+	defer e.buf.Reset()
+	err := e.encoder.Encode(data)
+	if err != nil {
+		return nil, err
+	}
+	return e.buf.Bytes(), nil
+}
diff --git a/plugins/queue/api/queue_repository.go b/plugins/queue/queue_repository.go
similarity index 76%
rename from plugins/queue/api/queue_repository.go
rename to plugins/queue/queue_repository.go
index 11031d0..1c49822 100644
--- a/plugins/queue/api/queue_repository.go
+++ b/plugins/queue/queue_repository.go
@@ -15,26 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package api
+package queue
 
 import (
 	"reflect"
 
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/plugins/queue/api"
+	"github.com/apache/skywalking-satellite/plugins/queue/mmap"
 )
 
-// GetQueue an initialized filter plugin.
-func GetQueue(config plugin.Config) Queue {
-	return plugin.Get(reflect.TypeOf((*Queue)(nil)).Elem(), config).(Queue)
-}
-
 // RegisterQueuePlugins register the used queue plugins.
 func RegisterQueuePlugins() {
-	plugin.RegisterPluginCategory(reflect.TypeOf((*Queue)(nil)).Elem())
-	queues := []Queue{
+	plugin.RegisterPluginCategory(reflect.TypeOf((*api.Queue)(nil)).Elem())
+	queues := []api.Queue{
+		&mmap.Queue{},
 		// Please register the queue plugins at here.
 	}
-	for _, queue := range queues {
-		plugin.RegisterPlugin(queue)
+	for _, q := range queues {
+		plugin.RegisterPlugin(q)
 	}
 }
diff --git a/plugins/receiver/api/receiver.go b/plugins/receiver/api/receiver.go
index 9e47692..ab04d9c 100644
--- a/plugins/receiver/api/receiver.go
+++ b/plugins/receiver/api/receiver.go
@@ -31,5 +31,5 @@ type Receiver interface {
 	RegisterHandler(server api.Server)
 
 	// Channel would be put a data when the receiver receives an APM data.
-	Channel() <-chan event.SerializableEvent
+	Channel() <-chan *event.Event
 }