You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2021/02/19 18:31:14 UTC

[skywalking-satellite] branch polish-codes created (now 4c47a62)

This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a change to branch polish-codes
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git.


      at 4c47a62  polish codes for release & add performance test doc

This branch includes the following new commits:

     new 4c47a62  polish codes for release & add performance test doc

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-satellite] 01/01: polish codes for release & add performance test doc

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch polish-codes
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit 4c47a621b4c776771c134a910f32c4db4755734d
Author: Evan <ev...@outlook.com>
AuthorDate: Sat Feb 20 02:27:55 2021 +0800

    polish codes for release & add performance test doc
---
 CHANGES.md                                         | 20 +++++--
 Makefile                                           | 11 +++-
 README.md                                          |  6 ++
 configs/satellite_config.yaml                      | 51 +++++++++++++----
 docs/en/FAQ/performance.md                         | 41 ++++++++++++++
 docs/en/concepts-and-designs/README.md             |  1 +
 docs/en/concepts-and-designs/mmap-queue.md         | 65 ++++++++++++++++++++++
 docs/en/setup/plugins/plugin-list.md               |  2 +-
 docs/en/setup/plugins/queue_mmap-queue.md          |  6 +-
 ...iver.md => receiver_http-nativelog-receiver.md} |  2 +-
 internal/pkg/plugin/registry.go                    | 12 +++-
 internal/satellite/boot/boot.go                    |  9 ++-
 internal/satellite/config/loader_test.go           | 32 ++++++-----
 internal/satellite/module/gatherer/create.go       |  3 -
 .../satellite/module/gatherer/fetcher_gatherer.go  | 28 +++-------
 .../satellite/module/gatherer/receiver_gatherer.go | 51 +++++++++--------
 internal/satellite/module/processor/create.go      |  2 -
 internal/satellite/module/processor/processor.go   |  2 +-
 internal/satellite/module/sender/create.go         |  2 -
 internal/satellite/module/sender/sender.go         | 28 +++++-----
 internal/satellite/telemetry/collector.go          | 22 ++++++++
 .../telemetry/{telemetry.go => config.go}          | 14 +----
 internal/satellite/telemetry/counter.go            | 57 +++++++++++++++++++
 internal/satellite/telemetry/telemetry.go          | 47 ++++++++--------
 plugins/queue/memory/queue.go                      |  5 +-
 plugins/queue/mmap/branchmark_test.go              |  9 ++-
 plugins/queue/mmap/queue.go                        | 27 ++++-----
 plugins/queue/mmap/queue_operation.go              |  4 ++
 plugins/queue/mmap/queue_test.go                   | 16 +-----
 plugins/queue/mmap/segment_operation.go            |  8 ++-
 plugins/receiver/http/nativcelog/receiver.go       |  4 +-
 plugins/receiver/http/nativcelog/receiver_test.go  |  2 +-
 plugins/server/grpc/server.go                      |  3 +-
 plugins/server/http/server.go                      |  3 +-
 plugins/server/prometheus/prometheus.go            |  7 ++-
 tools/docker_build.sh                              |  2 +
 36 files changed, 417 insertions(+), 187 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 2ddab4f..1b4032e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -4,11 +4,23 @@ Release Notes.
 
 0.1.0
 ------------------
-#### Project
-
 #### Features
+* Build the Satellite core structure.
+* Add prometheus self telemetry.
+* Add kafka client plugin.
+* Add none-fallbacker plugin.
+* Add timer-fallbacker plugin.
+* Add nativelog-kafka-forwarder plugin.
+* Add memory-queue plugin.
+* Add mmap-queue plugin.
+* Add grpc-nativelog-receiver plugin.
+* Add http-nativelog-receiver plugin.
+* Add grpc-server plugin.
+* Add http-server plugin.
+* Add prometheus-server plugin.
 
 #### Bug Fixes
 
-All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/64?closed=1)
-
+#### Issues and PR
+- All issues  are [here](https://github.com/apache/skywalking/milestone/64?closed=1)  
+- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Aopen+is%3Apr+milestone%3A0.1.0)
diff --git a/Makefile b/Makefile
index 8634797..1755ed8 100644
--- a/Makefile
+++ b/Makefile
@@ -22,6 +22,8 @@ BINARY = skywalking-satellite
 RELEASE_BIN = skywalking-satellite-$(VERSION)-bin
 RELEASE_SRC = skywalking-satellite-$(VERSION)-src
 
+PLUGIN_DOC_DIR = docs/en/setup/plugins
+
 OSNAME := $(if $(findstring Darwin,$(shell uname)),darwin,linux)
 
 SH = sh
@@ -73,11 +75,16 @@ clean: tools
 	-rm -rf coverage.txt
 
 .PHONY: build
-build: clean deps linux darwin windows
+build: clean deps
+	rm -rf bin/*
+	make linux
+	make darwin
+	make windows
 
 .PHONY: check
 check: clean
-	$(OUT_DIR)/$(BINARY)-$(VERSION)-$(OSNAME)-$(ARCH) docs --output=docs/en/setup/plugins
+	rm -rf $(PLUGIN_DOC_DIR)
+	$(OUT_DIR)/$(BINARY)-$(VERSION)-$(OSNAME)-$(ARCH) docs --output=$(PLUGIN_DOC_DIR)
 	$(GO) mod tidy > /dev/null
 	@if [ ! -z "`git status -s`" ]; then \
 		echo "Following files are not consistent with CI:"; \
diff --git a/README.md b/README.md
index f46a0d9..d2d28da 100644
--- a/README.md
+++ b/README.md
@@ -14,6 +14,12 @@ Apache SkyWalking Satellite
 
 NOTICE, SkyWalking Satellite uses [v3 protocols](https://github.com/apache/skywalking/blob/master/docs/en/protocols/README.md). They are incompatible with previous SkyWalking releases before SkyWalking 8.0.
 
+# Performance
+- 0.5 core supported 3000 ops throughput with memory queue.
+- 0.5 core supported 1500 ops throughput with the memory mapped queue(Ensure data stability).
+
+Please read [the doc](./docs/en/FAQ/performance.md) to get more details.
+
 # Download
 Go to the [download page](https://skywalking.apache.org/downloads/) to download all available binaries, including MacOS, Linux and Windows. Due to system compatibility problems, some plugins of SkyWalking Satellite cannot be used in Windows system. Check [the corresponding documentation](./docs/en/setup/plugins) to see whether the plugin is available on Windows.
 
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 407b4bc..19bf595 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -17,48 +17,77 @@
 
 # The logger configuration.
 logger:
+  # The log format pattern configuration.
   log_pattern: ${SATELLITE_LOGGER_LOG_PATTERN:%time [%level][%field] - %msg}
+  # The time format pattern configuration.
   time_pattern: ${SATELLITE_LOGGER_TIME_PATTERN:2006-01-02 15:04:05.000}
+  # The lowest level of printing allowed.
   level: ${SATELLITE_LOGGER_LEVEL:info}
 
 # The Satellite self telemetry configuration.
 telemetry:
-  cluster: ${SATELLITE_TELEMETRY_CLUSTER:default-cluster}
-  service: ${SATELLITE_TELEMETRY_SERVICE:default-service}
-  instance: ${SATELLITE_TELEMETRY_SERVICE:default-instance}
+  # The space concept for the deployment, such as the namespace concept in the Kubernetes.
+  cluster: ${SATELLITE_TELEMETRY_CLUSTER:satellite-cluster}
+  # The group concept for the deployment, such as the service resource concept in the Kubernetes.
+  service: ${SATELLITE_TELEMETRY_SERVICE:satellite-service}
+  # The minimum running unit, such as the pod concept in the Kubernetes.
+  instance: ${SATELLITE_TELEMETRY_SERVICE:satellite-instance}
 
 # The sharing plugins referenced by the specific plugins in the different pipes.
 sharing:
   clients:
     - plugin_name: "kafka-client"
+      # The Kafka broker addresses (default localhost:9092). Multiple values are separated by commas.
       brokers: ${SATELLITE_KAFKA_CLIENT_BROKERS:127.0.0.1:9092}
+      # The Kakfa version should follow this pattern, which is major_minor_veryMinor_patch.
       version: ${SATELLITE_KAFKA_VERSION:"2.1.1"}
+      # The TLS switch
+      enable_TLS: ${SATELLITE_KAFKA_ENABLE_TLS:false}
+      # The file path of client.pem. The config only works when opening the TLS switch.
+      client_pem_path: ${SATELLITE_KAFKA_CLIENT_PEM_PATH:"client.pem"}
+      # The file path of client.key. The config only works when opening the TLS switch.
+      client_key_path: ${SATELLITE_KAFKA_CLIENT_KEY_PATH:"client.key"}
+      # The file path oca.pem. The config only works when opening the TLS switch.
+      ca_pem_path: ${SATELLITE_KAFKA_CA_PEM_PATH:"ca.pem"}
   servers:
     - plugin_name: "grpc-server"
+      # The address of grpc server.
+      address: ${SATELLITE_GRPC_ADDRESS:":11800"}
+      # The TLS cert file path.
+      tls_cert_file: ${SATELLITE_GRPC_TLS_KEY_FILE:""}
+      # The TLS key file path.
+      tls_key_file: ${SATELLITE_GRPC_TLS_KEY_FILE:""}
     - plugin_name: "prometheus-server"
-      address: ${SATELLITE_PROMETHEUS_ADDRESS:":8090"}
-# The working pipes.
+      # The prometheus server address.
+      address: ${SATELLITE_PROMETHEUS_ADDRESS:":1234"}
+      # The prometheus server metrics endpoint.
+      endpoint: ${SATELLITE_PROMETHEUS_ENDPOINT:"/metrics"}
+
+# The working pipe configurations.
 pipes:
   - common_config:
-      pipe_name: pipe1
+      pipe_name: logpipe
     gatherer:
       server_name: "grpc-server"
       receiver:
         plugin_name: "grpc-nativelog-receiver"
       queue:
-        plugin_name: "mmap-queue"
-        segment_size: ${SATELLITE_MMAP_QUEUE_SIZE:524288}
-        max_in_mem_segments: ${SATELLITE_MMAP_QUEUE_MAX_IN_MEM_SEGMENTS:6}
-        queue_dir: "pipe1-log-grpc-receiver-queue"
+        plugin_name: "memory-queue"
+        # The maximum buffer event size.
+        event_buffer_size: ${SATELLITE_QUEUE_EVENT_BUFFER_SIZE:5000}
     processor:
       filters:
     sender:
       fallbacker:
         plugin_name: none-fallbacker
-      flush_time: ${SATELLITE_PIPE1_SENDER_FLUSH_TIME:1000}
+      # The time interval between two flush operations. And the time unit is millisecond.
+      flush_time: ${SATELLITE_LOGPIPE_SENDER_FLUSH_TIME:1000}
+      # The maximum buffer elements.
       max_buffer_size: ${SATELLITE_PIPE1_SENDER_MAX_BUFFER_SIZE:200}
+      # The minimum flush elements.
       min_flush_events: ${SATELLITE_PIPE1_SENDER_MIN_FLUSH_EVENTS:100}
       client_name: kafka-client
       forwarders:
         - plugin_name: nativelog-kafka-forwarder
+          # The remote kafka topic.
           topic: ${SATELLITE_NATIVELOG-TOPIC:log-topic}
diff --git a/docs/en/FAQ/performance.md b/docs/en/FAQ/performance.md
new file mode 100644
index 0000000..0d8cd41
--- /dev/null
+++ b/docs/en/FAQ/performance.md
@@ -0,0 +1,41 @@
+# What is the performance of the Satellite?
+## Performance
+The performance reduction of the mmap-queue  is mainly due to the file persistent operation to ensure data stability. However, the queue is used to collect some core telemetry data. We will continue to optimize the performance of this queue.
+
+- 0.5 core supported 3000 ops throughput with memory queue.
+- 0.5 core supported 1500 ops throughput with the memory mapped queue(Ensure data stability).
+
+
+## Details
+### Testing environment
+1. machine: 
+    -  cpu: INTEL Xeon E5-2650 V4 12C 2.2GHZ * 2
+    - memory: INVENTEC PC4-19200 * 8
+    - harddisk: INVENTEC SATA 4T 7.2K * 8
+2. Kafka: 
+    - region: the same region with the test machine in Baidu Cloud.
+    - version.: 0.1.1.0
+3. The input plugin: grpc-nativelog-receiver
+4. resource limit:
+    - cpu: 500m(0.5 core)
+    - memory: 100M
+
+### Performance Test With Memory Queue
+|  Qps   |stack memory in use| heap memory in use  |no-heap memory in use | 
+|  ----  | ----  | ----  | ----  |
+| 400  | 2.13M | 11M |83K|
+| 800  | 2.49M | 13.4M |83K|
+| 1200  | 2.72M | 13.4M |83K|
+| 1600  | 2.85M | 16.2M |83K|
+| 2000  | 2.92M | 17.6M |83K|
+| 2400  | 2.98M | 18.3M |83K|
+| 2800  | 3.54M | 26.8M |83K|
+| 3000  | 3.34M | 28M |83K|
+
+### Performance Test With Mmap Queue
+|  Qps   |stack memory in use| heap memory in use  |no-heap memory in use | 
+|  ----  | ----  | ----  | ----  |
+| 400  | 2.39M | 9.5M |83K|
+| 800  | 2.43M | 12.1M |83K|
+| 1200  | 2.49M | 12M |83K|
+| 1600  | 2.62M | 13.3M |83K|
diff --git a/docs/en/concepts-and-designs/README.md b/docs/en/concepts-and-designs/README.md
index 89155c0..084d53c 100644
--- a/docs/en/concepts-and-designs/README.md
+++ b/docs/en/concepts-and-designs/README.md
@@ -11,3 +11,4 @@ you are interested, then dive in.
 - [Module Design](./module_design.md)
 - [Plugin mechanism](./plugin_mechanism.md)
 - [Project Structure](./project_structue.md)
+- [The design of the memory mapped queue](./mmap-queue.md)
diff --git a/docs/en/concepts-and-designs/mmap-queue.md b/docs/en/concepts-and-designs/mmap-queue.md
new file mode 100644
index 0000000..df0eaf0
--- /dev/null
+++ b/docs/en/concepts-and-designs/mmap-queue.md
@@ -0,0 +1,65 @@
+# Design
+The mmap-queue is a big, fast, and persistent queue based on the memory-mapped files. One mmap-queue has a directory to store the whole data. The queue directory is made up of many segments and 1 metafile. This is originally implemented by [bigqueue](https://github.com/grandecola/bigqueue) project, we changed it a little for fitting the Satellite project requirements.
+
+- Segment: Segment is the real data store center, that provides large-space storage and does not reduce read and write performance as much as possible by using mmap. And we will avoid deleting files by reusing them.
+- Meta: The purpose of meta is to find the data that the consumer needs.
+
+## Meta
+Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment, it takes at least one memory page size, which is generally 4K.
+```
+[    8Bit   ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit ][  8Bit  ]
+[metaVersion][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][  ID   ][ offset][capacity]
+[metaVersion][writing   offset][watermark offset][committed offset][reading   offset][capacity]
+
+```
+### Transforming
+
+![](https://skywalking.apache.org/blog/2020-11-25-skywalking-satellite-0.1.0-design/offset-convert.jpg)
+
+
+## BenchmarkTest
+Test machine: **macbook pro 2018**
+
+```
+Model Name:	MacBook Pro
+Model Identifier:	MacBookPro15,1
+Processor Name:	6-Core Intel Core i7
+Processor Speed:	2.2 GHz
+Number of Processors:	1
+Total Number of Cores:	6
+L2 Cache (per Core):	256 KB
+L3 Cache:	9 MB
+Hyper-Threading Technology:	Enabled
+Memory:	16 GB
+System Firmware Version:	1554.60.15.0.0 (iBridge: 18.16.13030.0.0,0
+```
+
+### push operation
+
+```
+goos: darwin
+goarch: amd64
+pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap
+BenchmarkEnqueue
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000         	   10000	    106520 ns/op	    9888 B/op	       9 allocs/op
+BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000         	   18536	     54331 ns/op	    9839 B/op	       9 allocs/op
+BenchmarkEnqueue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000          	   27859	     43251 ns/op	    9815 B/op	       9 allocs/op
+BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000         	   23673	     45910 ns/op	    9839 B/op	       9 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000        	   10000	    131686 ns/op	   18941 B/op	      10 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000        	   23011	     47101 ns/op	    9887 B/op	       9 allocs/op
+PASS
+```
+### push and pop operation
+```
+goos: darwin
+goarch: amd64
+pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap
+BenchmarkEnqueueAndDequeue
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000         	   18895	     53056 ns/op	   28773 B/op	      42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000         	   24104	    117128 ns/op	   28725 B/op	      42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000          	   23733	     71632 ns/op	   28699 B/op	      41 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000         	   26286	     64377 ns/op	   28725 B/op	      42 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000        	   10000	    118004 ns/op	   54978 B/op	      43 allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000        	   16489	     64400 ns/op	   28772 B/op	      42 allocs/op
+PASS
+```
diff --git a/docs/en/setup/plugins/plugin-list.md b/docs/en/setup/plugins/plugin-list.md
index 19a7023..e200cda 100755
--- a/docs/en/setup/plugins/plugin-list.md
+++ b/docs/en/setup/plugins/plugin-list.md
@@ -14,7 +14,7 @@
 	- [mmap-queue](./queue_mmap-queue.md)
 - Receiver
 	- [grpc-nativelog-receiver](./receiver_grpc-nativelog-receiver.md)
-	- [http-log-receiver](./receiver_http-log-receiver.md)
+	- [http-nativelog-receiver](./receiver_http-nativelog-receiver.md)
 - Server
 	- [grpc-server](./server_grpc-server.md)
 	- [http-server](./server_http-server.md)
diff --git a/docs/en/setup/plugins/queue_mmap-queue.md b/docs/en/setup/plugins/queue_mmap-queue.md
index 2fe4602..4345de7 100755
--- a/docs/en/setup/plugins/queue_mmap-queue.md
+++ b/docs/en/setup/plugins/queue_mmap-queue.md
@@ -4,17 +4,15 @@ This is a memory mapped queue to provide the persistent storage for the input ev
 ## DefaultConfig
 ```yaml
 # The size of each segment. Default value is 128K. The unit is Byte.
-segment_size: 131072
+segment_size: 262114
 # The max num of segments in memory. Default value is 10.
 max_in_mem_segments: 10
 # The capacity of Queue = segment_size * queue_capacity_segments.
-queue_capacity_segments: 4000
+queue_capacity_segments: 2000
 # The period flush time. The unit is ms. Default value is 1 second.
 flush_period: 1000
 # The max number in one flush time.  Default value is 10000.
 flush_ceiling_num: 10000
-# Contains all files in the queue.
-queue_dir: satellite-mmap-queue
 # The max size of the input event. Default value is 20k.
 max_event_size: 20480
 ```
diff --git a/docs/en/setup/plugins/receiver_http-log-receiver.md b/docs/en/setup/plugins/receiver_http-nativelog-receiver.md
similarity index 89%
rename from docs/en/setup/plugins/receiver_http-log-receiver.md
rename to docs/en/setup/plugins/receiver_http-nativelog-receiver.md
index 4e9496a..e32498d 100755
--- a/docs/en/setup/plugins/receiver_http-log-receiver.md
+++ b/docs/en/setup/plugins/receiver_http-nativelog-receiver.md
@@ -1,4 +1,4 @@
-# Receiver/http-log-receiver
+# Receiver/http-nativelog-receiver
 ## Description
 This is a receiver for SkyWalking http logging format, which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto.
 ## DefaultConfig
diff --git a/internal/pkg/plugin/registry.go b/internal/pkg/plugin/registry.go
index cd6ffc9..6f1b7ae 100644
--- a/internal/pkg/plugin/registry.go
+++ b/internal/pkg/plugin/registry.go
@@ -22,6 +22,8 @@ import (
 	"reflect"
 	"strings"
 
+	"github.com/sirupsen/logrus"
+
 	"github.com/spf13/viper"
 
 	"github.com/apache/skywalking-satellite/internal/pkg/config"
@@ -50,12 +52,18 @@ func RegisterPlugin(plugin Plugin) {
 	for pCategory, pReg := range Reg {
 		if v.Type().Implements(pCategory) {
 			pReg[plugin.Name()] = v
-			log.Logger.Infof("register %s %s successfully", plugin.Name(), v.Type().String())
+			log.Logger.WithFields(logrus.Fields{
+				"category":    v.Type().String(),
+				"plugin_name": plugin.Name(),
+			}).Debug("register plugin success")
 			success = true
 		}
 	}
 	if !success {
-		log.Logger.Errorf("this type of %s is not supported to register : %s", plugin.Name(), v.Type().String())
+		log.Logger.WithFields(logrus.Fields{
+			"category":    v.Type().String(),
+			"plugin_name": plugin.Name(),
+		}).Error("plugin is not allowed to register")
 	}
 }
 
diff --git a/internal/satellite/boot/boot.go b/internal/satellite/boot/boot.go
index 2bfeeb1..8e19e44 100644
--- a/internal/satellite/boot/boot.go
+++ b/internal/satellite/boot/boot.go
@@ -22,11 +22,12 @@ import (
 	"errors"
 	"fmt"
 	"os"
+	"os/signal"
 	"reflect"
 	"sync"
 	"syscall"
 
-	"os/signal"
+	"github.com/sirupsen/logrus"
 
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/config"
@@ -114,7 +115,10 @@ func prepareModules(container ModuleContainer) error {
 				for _, preparedModule := range preparedModules {
 					preparedModule.Shutdown()
 				}
-				log.Logger.Errorf("%s module of %s namespace is error in preparing stage, error is %v", reflect.TypeOf(m).String(), ns, err)
+				log.Logger.WithFields(logrus.Fields{
+					"pipe":   ns,
+					"module": reflect.TypeOf(m).String(),
+				}).Errorf("error in preparing stage: %v", err)
 				return err
 			}
 		}
@@ -125,7 +129,6 @@ func prepareModules(container ModuleContainer) error {
 // bootModules boot all modules.
 func bootModules(ctx context.Context, container ModuleContainer) {
 	log.Logger.Infof("satellite is starting...")
-
 	var wg sync.WaitGroup
 	for _, modules := range container {
 		wg.Add(len(modules))
diff --git a/internal/satellite/config/loader_test.go b/internal/satellite/config/loader_test.go
index eabebf0..9712998 100644
--- a/internal/satellite/config/loader_test.go
+++ b/internal/satellite/config/loader_test.go
@@ -85,54 +85,60 @@ func params() *SatelliteConfig {
 					"brokers":                "127.0.0.1:9092",
 					"version":                "2.1.1",
 					"commonfields_pipe_name": "sharing",
+					"ca_pem_path":            "ca.pem",
+					"client_key_path":        "client.key",
+					"client_pem_path":        "client.pem",
+					"enable_TLS":             false,
 				},
 			},
 			Servers: []plugin.Config{
 				{
 					"plugin_name":            "grpc-server",
 					"commonfields_pipe_name": "sharing",
+					"address":                ":11800",
+					"tls_cert_file":          "",
+					"tls_key_file":           "",
 				},
 				{
 					"plugin_name":            "prometheus-server",
-					"address":                ":8090",
+					"address":                ":1234",
 					"commonfields_pipe_name": "sharing",
+					"endpoint":               "/metrics",
 				},
 			},
 		},
 		Pipes: []*PipeConfig{
 			{
 				PipeCommonConfig: &config.CommonFields{
-					PipeName: "pipe1",
+					PipeName: "logpipe",
 				},
 
 				Gatherer: &gatherer.GathererConfig{
 					ServerName: "grpc-server",
 					CommonFields: &config.CommonFields{
-						PipeName: "pipe1",
+						PipeName: "logpipe",
 					},
 					ReceiverConfig: plugin.Config{
 						"plugin_name":            "grpc-nativelog-receiver",
-						"commonfields_pipe_name": "pipe1",
+						"commonfields_pipe_name": "logpipe",
 					},
 					QueueConfig: plugin.Config{
-						"plugin_name":            "mmap-queue",
-						"segment_size":           524288,
-						"max_in_mem_segments":    6,
-						"queue_dir":              "pipe1-log-grpc-receiver-queue",
-						"commonfields_pipe_name": "pipe1",
+						"commonfields_pipe_name": "logpipe",
+						"plugin_name":            "memory-queue",
+						"event_buffer_size":      5000,
 					},
 				},
 				Processor: &processor.ProcessorConfig{
 					CommonFields: &config.CommonFields{
-						PipeName: "pipe1",
+						PipeName: "logpipe",
 					},
 				},
 				Sender: &sender.SenderConfig{
 					CommonFields: &config.CommonFields{
-						PipeName: "pipe1",
+						PipeName: "logpipe",
 					},
 					FallbackerConfig: plugin.Config{
-						"commonfields_pipe_name": "pipe1",
+						"commonfields_pipe_name": "logpipe",
 						"plugin_name":            "none-fallbacker",
 					},
 					FlushTime:      1000,
@@ -143,7 +149,7 @@ func params() *SatelliteConfig {
 						{
 							"plugin_name":            "nativelog-kafka-forwarder",
 							"topic":                  "log-topic",
-							"commonfields_pipe_name": "pipe1",
+							"commonfields_pipe_name": "logpipe",
 						},
 					},
 				},
diff --git a/internal/satellite/module/gatherer/create.go b/internal/satellite/module/gatherer/create.go
index b161697..8973525 100644
--- a/internal/satellite/module/gatherer/create.go
+++ b/internal/satellite/module/gatherer/create.go
@@ -18,7 +18,6 @@
 package gatherer
 
 import (
-	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
 	"github.com/apache/skywalking-satellite/internal/satellite/sharing"
 	fetcher "github.com/apache/skywalking-satellite/plugins/fetcher/api"
@@ -39,7 +38,6 @@ func NewGatherer(cfg *api.GathererConfig) api.Gatherer {
 
 // newFetcherGatherer crates a gatherer with the fetcher role.
 func newFetcherGatherer(cfg *api.GathererConfig) *FetcherGatherer {
-	log.Logger.Infof("fetcher gatherer module of %s namespace is being initialized", cfg.PipeName)
 	return &FetcherGatherer{
 		config:         cfg,
 		runningQueue:   queue.GetQueue(cfg.QueueConfig),
@@ -50,7 +48,6 @@ func newFetcherGatherer(cfg *api.GathererConfig) *FetcherGatherer {
 
 // newReceiverGatherer crates a gatherer with the receiver role.
 func newReceiverGatherer(cfg *api.GathererConfig) *ReceiverGatherer {
-	log.Logger.Infof("receiver gatherer module of %s namespace is being initialized", cfg.PipeName)
 	return &ReceiverGatherer{
 		config:          cfg,
 		runningQueue:    queue.GetQueue(cfg.QueueConfig),
diff --git a/internal/satellite/module/gatherer/fetcher_gatherer.go b/internal/satellite/module/gatherer/fetcher_gatherer.go
index 2695e3d..13ae0bf 100644
--- a/internal/satellite/module/gatherer/fetcher_gatherer.go
+++ b/internal/satellite/module/gatherer/fetcher_gatherer.go
@@ -22,8 +22,6 @@ import (
 	"sync"
 	"time"
 
-	"github.com/prometheus/client_golang/prometheus"
-
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/event"
 	"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
@@ -44,26 +42,18 @@ type FetcherGatherer struct {
 	outputChannel chan *queue.SequenceEvent
 
 	// metrics
-	fetchCounter       *prometheus.CounterVec
-	queueOutputCounter *prometheus.CounterVec
+	fetchCounter       *telemetry.Counter
+	queueOutputCounter *telemetry.Counter
 }
 
 func (f *FetcherGatherer) Prepare() error {
-	f.fetchCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
-		Name: "gatherer_fetch_count",
-		Help: "Total number of the receiving count in the Gatherer.",
-	}, []string{"pipe", "status"})
-	f.queueOutputCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
-		Subsystem: "fetcher",
-		Name:      "queue_output_count",
-		Help:      "Total number of the output count in the Queue of Gatherer.",
-	}, []string{"pipe", "status"})
-	telemetry.Registerer.MustRegister(f.fetchCounter)
-	telemetry.Registerer.MustRegister(f.queueOutputCounter)
+	f.fetchCounter = telemetry.NewCounter("gatherer_fetch_count", "Total number of the receiving count in the Gatherer.", "pipe", "status")
+	f.queueOutputCounter = telemetry.NewCounter("queue_output_count", "Total number of the output count in the Queue of Gatherer.", "pipe", "status")
 	return nil
 }
 
 func (f *FetcherGatherer) Boot(ctx context.Context) {
+	log.Logger.WithField("pipe", f.config.PipeName).Info("fetch_gatherer module is starting...")
 	var wg sync.WaitGroup
 	wg.Add(2)
 	go func() {
@@ -76,9 +66,9 @@ func (f *FetcherGatherer) Boot(ctx context.Context) {
 				events := f.runningFetcher.Fetch()
 				for _, e := range events {
 					err := f.runningQueue.Enqueue(e)
-					f.fetchCounter.WithLabelValues(f.config.PipeName, "all").Inc()
+					f.fetchCounter.Inc(f.config.PipeName, "all")
 					if err != nil {
-						f.fetchCounter.WithLabelValues(f.config.PipeName, "abandoned").Inc()
+						f.fetchCounter.Inc(f.config.PipeName, "abandoned")
 						log.Logger.Errorf("cannot put event into queue in %s namespace, %v", f.config.PipeName, err)
 					}
 				}
@@ -102,11 +92,11 @@ func (f *FetcherGatherer) Boot(ctx context.Context) {
 			default:
 				if e, err := f.runningQueue.Dequeue(); err == nil {
 					f.outputChannel <- e
-					f.queueOutputCounter.WithLabelValues(f.config.PipeName, "success").Inc()
+					f.queueOutputCounter.Inc(f.config.PipeName, "success")
 				} else if err == queue.ErrEmpty {
 					time.Sleep(time.Second)
 				} else {
-					f.queueOutputCounter.WithLabelValues(f.config.PipeName, "error").Inc()
+					f.queueOutputCounter.Inc(f.config.PipeName, "error")
 					log.Logger.Errorf("error in popping from the queue: %v", err)
 				}
 			}
diff --git a/internal/satellite/module/gatherer/receiver_gatherer.go b/internal/satellite/module/gatherer/receiver_gatherer.go
index aadcbdd..dcf56ec 100644
--- a/internal/satellite/module/gatherer/receiver_gatherer.go
+++ b/internal/satellite/module/gatherer/receiver_gatherer.go
@@ -22,7 +22,7 @@ import (
 	"sync"
 	"time"
 
-	"github.com/prometheus/client_golang/prometheus"
+	"github.com/sirupsen/logrus"
 
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/event"
@@ -45,47 +45,40 @@ type ReceiverGatherer struct {
 	// self components
 	outputChannel chan *queue.SequenceEvent
 	// metrics
-	receiveCounter     *prometheus.CounterVec
-	queueOutputCounter *prometheus.CounterVec
+	receiveCounter     *telemetry.Counter
+	queueOutputCounter *telemetry.Counter
 }
 
 func (r *ReceiverGatherer) Prepare() error {
-	log.Logger.Infof("receiver gatherer module of %s namespace is preparing", r.config.PipeName)
+	log.Logger.WithField("pipe", r.config.PipeName).Info("receiver gatherer module is preparing...")
 	r.runningReceiver.RegisterHandler(r.runningServer.GetServer())
 	if err := r.runningQueue.Initialize(); err != nil {
-		log.Logger.Infof("the %s queue of %s namespace was failed to initialize", r.runningQueue.Name(), r.config.PipeName)
+		log.Logger.WithField("pipe", r.config.PipeName).Infof("the %s queue failed when initializing", r.runningQueue.Name())
 		return err
 	}
-	r.receiveCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
-		Name: "gatherer_receive_count",
-		Help: "Total number of the receiving count in the Gatherer.",
-	}, []string{"pipe", "status"})
-	r.queueOutputCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
-		Subsystem: "receiver",
-		Name:      "queue_output_count",
-		Help:      "Total number of the output count in the Queue of Gatherer.",
-	}, []string{"pipe", "status"})
-	telemetry.Registerer.MustRegister(r.receiveCounter)
-	log.Logger.Infof("register receiveCounter")
-	telemetry.Registerer.MustRegister(r.queueOutputCounter)
-	log.Logger.Infof("register queueOutputCounter")
+	r.receiveCounter = telemetry.NewCounter("gatherer_receive_count", "Total number of the receiving count in the Gatherer.", "pipe", "status")
+	r.queueOutputCounter = telemetry.NewCounter("queue_output_count", "Total number of the output count in the Queue of Gatherer.", "pipe", "status")
 	return nil
 }
 
 func (r *ReceiverGatherer) Boot(ctx context.Context) {
 	var wg sync.WaitGroup
 	wg.Add(2)
+	log.Logger.WithField("pipe", r.config.PipeName).Info("receive_gatherer module is starting...")
 	go func() {
 		childCtx, cancel := context.WithCancel(ctx)
 		defer wg.Done()
 		for {
 			select {
 			case e := <-r.runningReceiver.Channel():
-				r.receiveCounter.WithLabelValues(r.config.PipeName, "all").Inc()
+				r.receiveCounter.Inc(r.config.PipeName, "all")
 				err := r.runningQueue.Enqueue(e)
 				if err != nil {
-					r.receiveCounter.WithLabelValues(r.config.PipeName, "abandoned").Inc()
-					log.Logger.Errorf("cannot put event into queue in %s namespace, error is: %v", r.config.PipeName, err)
+					r.receiveCounter.Inc(r.config.PipeName, "abandoned")
+					log.Logger.WithFields(logrus.Fields{
+						"pipe":  r.config.PipeName,
+						"queue": r.runningQueue.Name(),
+					}).Errorf("error in enqueue: %v", err)
 				}
 			case <-childCtx.Done():
 				cancel()
@@ -107,12 +100,15 @@ func (r *ReceiverGatherer) Boot(ctx context.Context) {
 			default:
 				if e, err := r.runningQueue.Dequeue(); err == nil {
 					r.outputChannel <- e
-					r.queueOutputCounter.WithLabelValues(r.config.PipeName, "success").Inc()
+					r.queueOutputCounter.Inc(r.config.PipeName, "success")
 				} else if err == queue.ErrEmpty {
 					time.Sleep(time.Second)
 				} else {
-					r.queueOutputCounter.WithLabelValues(r.config.PipeName, "error").Inc()
-					log.Logger.Errorf("error in popping from the queue: %v", err)
+					r.queueOutputCounter.Inc(r.config.PipeName, "error")
+					log.Logger.WithFields(logrus.Fields{
+						"pipe":  r.config.PipeName,
+						"queue": r.runningQueue.Name(),
+					}).Errorf("error in dequeue: %v", err)
 				}
 			}
 		}
@@ -121,9 +117,12 @@ func (r *ReceiverGatherer) Boot(ctx context.Context) {
 }
 
 func (r *ReceiverGatherer) Shutdown() {
-	log.Logger.Infof("receiver gatherer module of %s namespace is closing", r.config.PipeName)
+	log.Logger.WithField("pipe", r.config.PipeName).Infof("receiver gatherer module is closing...")
 	if err := r.runningQueue.Close(); err != nil {
-		log.Logger.Errorf("failure occurs when closing %s queue  in %s namespace, error is: %v", r.runningQueue.Name(), r.config.PipeName, err)
+		log.Logger.WithFields(logrus.Fields{
+			"pipe":  r.config.PipeName,
+			"queue": r.runningQueue.Name(),
+		}).Errorf("error in closing: %v", err)
 	}
 }
 
diff --git a/internal/satellite/module/processor/create.go b/internal/satellite/module/processor/create.go
index 85e980c..f33a70b 100644
--- a/internal/satellite/module/processor/create.go
+++ b/internal/satellite/module/processor/create.go
@@ -18,7 +18,6 @@
 package processor
 
 import (
-	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	gatherer "github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
 	"github.com/apache/skywalking-satellite/internal/satellite/module/processor/api"
 	sender "github.com/apache/skywalking-satellite/internal/satellite/module/sender/api"
@@ -27,7 +26,6 @@ import (
 
 // Init Processor and dependency plugins
 func NewProcessor(cfg *api.ProcessorConfig, s sender.Sender, g gatherer.Gatherer) api.Processor {
-	log.Logger.Infof("processor module of %s namespace is being initialized", cfg.PipeName)
 	p := &Processor{
 		sender:         s,
 		gatherer:       g,
diff --git a/internal/satellite/module/processor/processor.go b/internal/satellite/module/processor/processor.go
index a84fd9a..765ae4f 100644
--- a/internal/satellite/module/processor/processor.go
+++ b/internal/satellite/module/processor/processor.go
@@ -49,7 +49,7 @@ func (p *Processor) Prepare() error {
 
 // Boot fetches the data of Queue, does a series of processing, and then sends to Sender.
 func (p *Processor) Boot(ctx context.Context) {
-	log.Logger.Infof("processor module of %s namespace is running", p.config.PipeName)
+	log.Logger.WithField("pipe", p.config.PipeName).Info("processor module is starting...")
 	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {
diff --git a/internal/satellite/module/sender/create.go b/internal/satellite/module/sender/create.go
index 595e867..5da3cca 100644
--- a/internal/satellite/module/sender/create.go
+++ b/internal/satellite/module/sender/create.go
@@ -18,7 +18,6 @@
 package sender
 
 import (
-	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/event"
 	"github.com/apache/skywalking-satellite/internal/satellite/module/buffer"
 	gatherer "github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
@@ -31,7 +30,6 @@ import (
 
 // NewSender crate a Sender.
 func NewSender(cfg *api.SenderConfig, g gatherer.Gatherer) api.Sender {
-	log.Logger.Infof("sender module of %s namespace is being initialized", cfg.PipeName)
 	s := &Sender{
 		config:            cfg,
 		runningForwarders: []forwarder.Forwarder{},
diff --git a/internal/satellite/module/sender/sender.go b/internal/satellite/module/sender/sender.go
index 07a5e7a..d1f7f64 100644
--- a/internal/satellite/module/sender/sender.go
+++ b/internal/satellite/module/sender/sender.go
@@ -22,7 +22,7 @@ import (
 	"sync"
 	"time"
 
-	"github.com/prometheus/client_golang/prometheus"
+	"github.com/sirupsen/logrus"
 
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/event"
@@ -57,12 +57,12 @@ type Sender struct {
 	buffer        *buffer.BatchBuffer            // cache the downstream input data
 
 	// metrics
-	sendCounter *prometheus.CounterVec
+	sendCounter *telemetry.Counter
 }
 
 // Prepare register the client status listener to the client manager and open input channel.
 func (s *Sender) Prepare() error {
-	log.Logger.Infof("sender module of %s namespace is preparing", s.config.PipeName)
+	log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is preparing...")
 	s.runningClient.RegisterListener(s.listener)
 	s.logicInput = s.physicalInput
 	for _, runningForwarder := range s.runningForwarders {
@@ -71,18 +71,13 @@ func (s *Sender) Prepare() error {
 			return err
 		}
 	}
-	s.sendCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
-		Name: "sender_send_count",
-		Help: "Total number of the output count in the Sender.",
-	}, []string{"pipe", "status", "type"})
-	telemetry.Registerer.MustRegister(s.sendCounter)
-	log.Logger.Infof("register sendCounter")
+	s.sendCounter = telemetry.NewCounter("sender_output_count", "Total number of the output count in the Sender.", "pipe", "status", "type")
 	return nil
 }
 
 // Boot fetches the downstream input data and forward to external services, such as Kafka and OAP receiver.
 func (s *Sender) Boot(ctx context.Context) {
-	log.Logger.Infof("sender module of %s namespace is running", s.config.PipeName)
+	log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is starting...")
 	var wg sync.WaitGroup
 	wg.Add(2)
 	// 1. keep fetching the downstream data when client connected, and put it into BatchBuffer.
@@ -140,7 +135,7 @@ func (s *Sender) Boot(ctx context.Context) {
 
 // Shutdown closes the channels and tries to force forward the events in the buffer.
 func (s *Sender) Shutdown() {
-	log.Logger.Infof("sender module of %s namespace is closing", s.config.PipeName)
+	log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is closing")
 	close(s.logicInput)
 	for buf := range s.flushChannel {
 		s.consume(buf)
@@ -151,8 +146,11 @@ func (s *Sender) Shutdown() {
 
 // consume would forward the events by type and ack this batch.
 func (s *Sender) consume(batch *buffer.BatchBuffer) {
-	log.Logger.Infof("sender module of %s namespace is flushing a new batch buffer."+
-		" the start offset is %s, and the size is %d", s.config.PipeName, batch.Last(), batch.Len())
+	log.Logger.WithFields(logrus.Fields{
+		"pipe":   s.config.PipeName,
+		"offset": batch.Last(),
+		"size":   batch.Len(),
+	}).Info("sender module is flushing a new batch buffer.")
 	var events = make(map[protocol.EventType]event.BatchEvents)
 	for i := 0; i < batch.Len(); i++ {
 		eventContext := batch.Buf()[i]
@@ -168,11 +166,11 @@ func (s *Sender) consume(batch *buffer.BatchBuffer) {
 				continue
 			}
 			if err := f.Forward(batchEvents); err == nil {
-				s.sendCounter.WithLabelValues(s.config.PipeName, "success", f.ForwardType().String()).Add(float64(len(batchEvents)))
+				s.sendCounter.Add(float64(len(batchEvents)), s.config.PipeName, "success", f.ForwardType().String())
 				continue
 			}
 			if !s.runningFallbacker.FallBack(batchEvents, f.Forward) {
-				s.sendCounter.WithLabelValues(s.config.PipeName, "failure", f.ForwardType().String()).Add(float64(len(batchEvents)))
+				s.sendCounter.Add(float64(len(batchEvents)), s.config.PipeName, "failure", f.ForwardType().String())
 			}
 		}
 	}
diff --git a/internal/satellite/telemetry/collector.go b/internal/satellite/telemetry/collector.go
new file mode 100644
index 0000000..d521b16
--- /dev/null
+++ b/internal/satellite/telemetry/collector.go
@@ -0,0 +1,22 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package telemetry
+
+// The Self-telemetry data collection interface.
+type Collector interface {
+}
diff --git a/internal/satellite/telemetry/telemetry.go b/internal/satellite/telemetry/config.go
similarity index 76%
copy from internal/satellite/telemetry/telemetry.go
copy to internal/satellite/telemetry/config.go
index 660b57b..f139631 100644
--- a/internal/satellite/telemetry/telemetry.go
+++ b/internal/satellite/telemetry/config.go
@@ -17,16 +17,7 @@
 
 package telemetry
 
-import (
-	"github.com/prometheus/client_golang/prometheus"
-)
-
-// Registerer is the global metrics center for collecting the telemetry data in core modules or plugins.
-var (
-	registry   *prometheus.Registry
-	Registerer prometheus.Registerer // The register is for adding metrics to the registry.
-	Gatherer   prometheus.Gatherer   // The gatherer is for fetching metrics from the registry.
-)
+import "github.com/prometheus/client_golang/prometheus"
 
 // Config defines the common telemetry labels.
 type Config struct {
@@ -48,6 +39,7 @@ func Init(c *Config) {
 		labels["instance"] = c.Instance
 	}
 	registry = prometheus.NewRegistry()
-	Registerer = prometheus.WrapRegistererWith(labels, registry)
+	registerer = prometheus.WrapRegistererWith(labels, registry)
 	Gatherer = registry
+	collectorContainer = make(map[string]Collector)
 }
diff --git a/internal/satellite/telemetry/counter.go b/internal/satellite/telemetry/counter.go
new file mode 100644
index 0000000..9a91267
--- /dev/null
+++ b/internal/satellite/telemetry/counter.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package telemetry
+
+import "github.com/prometheus/client_golang/prometheus"
+
+// The counter metric.
+type Counter struct {
+	Collector
+	name    string // The name of counter.
+	counter *prometheus.CounterVec
+}
+
+// NewCounter create a new counter if no metric with the same name exists.
+func NewCounter(name, help string, labels ...string) *Counter {
+	lock.Lock()
+	defer lock.Unlock()
+	collector, ok := collectorContainer[name]
+	if !ok {
+		counter := &Counter{
+			name: name,
+			counter: prometheus.NewCounterVec(prometheus.CounterOpts{
+				Name: name,
+				Help: help,
+			}, labels),
+		}
+		Register(WithMeta(name, counter.counter))
+		collectorContainer[name] = counter
+		collector = counter
+	}
+	return collector.(*Counter)
+}
+
+// Add one.
+func (c *Counter) Inc(labelValues ...string) {
+	c.counter.WithLabelValues(labelValues...).Inc()
+}
+
+// Add float value.
+func (c *Counter) Add(val float64, labelValues ...string) {
+	c.counter.WithLabelValues(labelValues...).Add(val)
+}
diff --git a/internal/satellite/telemetry/telemetry.go b/internal/satellite/telemetry/telemetry.go
index 660b57b..eaeb2fb 100644
--- a/internal/satellite/telemetry/telemetry.go
+++ b/internal/satellite/telemetry/telemetry.go
@@ -18,36 +18,37 @@
 package telemetry
 
 import (
+	"sync"
+
 	"github.com/prometheus/client_golang/prometheus"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
 )
 
-// Registerer is the global metrics center for collecting the telemetry data in core modules or plugins.
+// registerer is the global metrics center for collecting the telemetry data in core modules or plugins.
 var (
-	registry   *prometheus.Registry
-	Registerer prometheus.Registerer // The register is for adding metrics to the registry.
-	Gatherer   prometheus.Gatherer   // The gatherer is for fetching metrics from the registry.
+	Gatherer           prometheus.Gatherer // The gatherer is for fetching metrics from the registry.
+	registry           *prometheus.Registry
+	registerer         prometheus.Registerer // The register is for adding metrics to the registry.
+	collectorContainer map[string]Collector
+	lock               sync.Mutex
 )
 
-// Config defines the common telemetry labels.
-type Config struct {
-	Cluster  string `mapstructure:"cluster"`  // The cluster name.
-	Service  string `mapstructure:"service"`  // The service name.
-	Instance string `mapstructure:"instance"` // The instance name.
+// register the metric meta to the registerer.
+func Register(meta ...SelfTelemetryMetaFunc) {
+	for _, telemetryMeta := range meta {
+		name, collector := telemetryMeta()
+		registerer.MustRegister(collector)
+		log.Logger.WithField("telemetry_name", name).Info("self telemetry register success")
+	}
 }
 
-// Init create the global telemetry center according to the config.
-func Init(c *Config) {
-	labels := make(map[string]string)
-	if c.Service != "" {
-		labels["service"] = c.Service
-	}
-	if c.Cluster != "" {
-		labels["cluster"] = c.Cluster
-	}
-	if c.Instance != "" {
-		labels["instance"] = c.Instance
+// SelfTelemetryMetaFunc returns the metric name and the metric instance.
+type SelfTelemetryMetaFunc func() (string, prometheus.Collector)
+
+// WithMeta is used as the param of the Register function.
+func WithMeta(name string, collector prometheus.Collector) SelfTelemetryMetaFunc {
+	return func() (string, prometheus.Collector) {
+		return name, collector
 	}
-	registry = prometheus.NewRegistry()
-	Registerer = prometheus.WrapRegistererWith(labels, registry)
-	Gatherer = registry
 }
diff --git a/plugins/queue/memory/queue.go b/plugins/queue/memory/queue.go
index 15fe84a..cbd5ac4 100644
--- a/plugins/queue/memory/queue.go
+++ b/plugins/queue/memory/queue.go
@@ -71,11 +71,12 @@ func (q *Queue) Enqueue(e *protocol.Event) error {
 func (q *Queue) Dequeue() (*api.SequenceEvent, error) {
 	element, err := q.buffer.Dequeue()
 	if err != nil {
-		log.Logger.Errorf("error in dequeue: %v", err)
+		log.Logger.Debugf("error in dequeue: %v", err)
 		return nil, api.ErrEmpty
 	}
 	return &api.SequenceEvent{
-		Event: element.(*protocol.Event),
+		Event:  element.(*protocol.Event),
+		Offset: "no_offset_in_memory_queue",
 	}, nil
 }
 
diff --git a/plugins/queue/mmap/branchmark_test.go b/plugins/queue/mmap/branchmark_test.go
index 4a385d2..536b683 100644
--- a/plugins/queue/mmap/branchmark_test.go
+++ b/plugins/queue/mmap/branchmark_test.go
@@ -36,18 +36,19 @@ type benchmarkParam struct {
 }
 
 var params = []benchmarkParam{
-	{segmentSize: 1024 * 128, message: 8, maxInMemSegments: 10, queueCapacity: 10000},
+	{segmentSize: 1024 * 128, message: 8, maxInMemSegments: 18, queueCapacity: 10000},
 	// compare the influence of the segmentSize.
 	{segmentSize: 1024 * 256, message: 8, maxInMemSegments: 10, queueCapacity: 10000},
+	{segmentSize: 1024 * 512, message: 8, maxInMemSegments: 6, queueCapacity: 10000},
 	// compare the influence of the maxInMemSegments.
-	{segmentSize: 1024 * 128, message: 8, maxInMemSegments: 20, queueCapacity: 10000},
+	{segmentSize: 1024 * 256, message: 8, maxInMemSegments: 20, queueCapacity: 10000},
 	// compare the influence of the message size.
 	{segmentSize: 1024 * 128, message: 16, maxInMemSegments: 10, queueCapacity: 10000},
 	{segmentSize: 1024 * 128, message: 8, maxInMemSegments: 10, queueCapacity: 100000},
 }
 
 func cleanBenchmarkQueue(b *testing.B, q api.Queue) {
-	if err := os.RemoveAll(q.(*Queue).QueueDir); err != nil {
+	if err := os.RemoveAll(q.(*Queue).queueName); err != nil {
 		b.Errorf("cannot remove test queue dir, %v", err)
 	}
 }
@@ -58,7 +59,6 @@ func BenchmarkEnqueue(b *testing.B) {
 			param.segmentSize/1024, param.maxInMemSegments, param.message, param.queueCapacity)
 		b.Run(name, func(b *testing.B) {
 			q, err := initMmapQueue(plugin.Config{
-				"queue_dir":               "BenchmarkEnqueue",
 				"segment_size":            param.segmentSize,
 				"max_in_mem_segments":     param.maxInMemSegments,
 				"queue_capacity_segments": param.queueCapacity,
@@ -88,7 +88,6 @@ func BenchmarkEnqueueAndDequeue(b *testing.B) {
 			param.segmentSize/1024, param.maxInMemSegments, param.message, param.queueCapacity)
 		b.Run(name, func(b *testing.B) {
 			q, err := initMmapQueue(plugin.Config{
-				"queue_dir":               "BenchmarkEnqueueAndDequeue",
 				"segment_size":            param.segmentSize,
 				"max_in_mem_segments":     param.maxInMemSegments,
 				"queue_capacity_segments": param.queueCapacity,
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
index c717a35..14d2687 100644
--- a/plugins/queue/mmap/queue.go
+++ b/plugins/queue/mmap/queue.go
@@ -50,15 +50,16 @@ const (
 type Queue struct {
 	config.CommonFields
 	// config
-	SegmentSize           int    `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
-	MaxInMemSegments      int32  `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
-	QueueCapacitySegments int    `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
-	FlushPeriod           int    `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
-	FlushCeilingNum       int    `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
-	MaxEventSize          int    `mapstructure:"max_event_size"`          // The max size of the input event.
-	QueueDir              string `mapstructure:"queue_dir"`               // Contains all files in the queue.
+	SegmentSize           int   `mapstructure:"segment_size"`            // The size of each segment. The unit is byte.
+	MaxInMemSegments      int32 `mapstructure:"max_in_mem_segments"`     // The max num of segments in memory.
+	QueueCapacitySegments int   `mapstructure:"queue_capacity_segments"` // The capacity of Queue = segment_size * queue_capacity_segments.
+	FlushPeriod           int   `mapstructure:"flush_period"`            // The period flush time. The unit is ms.
+	FlushCeilingNum       int   `mapstructure:"flush_ceiling_num"`       // The max number in one flush time.
+	MaxEventSize          int   `mapstructure:"max_event_size"`          // The max size of the input event.
 
 	// running components
+	lock                   sync.Mutex
+	queueName              string         // The queue name.
 	meta                   *meta.Metadata // The metadata file.
 	segments               []*mmap.File   // The data files.
 	mmapCount              int32          // The number of the memory mapped files.
@@ -72,7 +73,6 @@ type Queue struct {
 	ctx        context.Context    // Parent ctx
 	cancel     context.CancelFunc // Parent ctx cancel function
 	showDownWg sync.WaitGroup     // The shutdown wait group.
-
 }
 
 func (q *Queue) Name() string {
@@ -87,17 +87,15 @@ func (q *Queue) Description() string {
 func (q *Queue) DefaultConfig() string {
 	return `
 # The size of each segment. Default value is 128K. The unit is Byte.
-segment_size: 131072
+segment_size: 262114
 # The max num of segments in memory. Default value is 10.
 max_in_mem_segments: 10
 # The capacity of Queue = segment_size * queue_capacity_segments.
-queue_capacity_segments: 4000
+queue_capacity_segments: 2000
 # The period flush time. The unit is ms. Default value is 1 second.
 flush_period: 1000
 # The max number in one flush time.  Default value is 10000.
 flush_ceiling_num: 10000
-# Contains all files in the queue.
-queue_dir: satellite-mmap-queue
 # The max size of the input event. Default value is 20k.
 max_event_size: 20480
 `
@@ -116,8 +114,9 @@ func (q *Queue) Initialize() error {
 	if q.MaxInMemSegments < minimumSegments {
 		q.MaxInMemSegments = minimumSegments
 	}
+	q.queueName = strings.Join([]string{q.Name(), q.PipeName}, "_")
 	// load metadata and override the reading or writing offset by the committed or watermark offset.
-	md, err := meta.NewMetaData(q.QueueDir, q.QueueCapacitySegments)
+	md, err := meta.NewMetaData(q.queueName, q.QueueCapacitySegments)
 	if err != nil {
 		return fmt.Errorf("error in creating the metadata: %v", err)
 	}
@@ -175,6 +174,8 @@ func (q *Queue) Dequeue() (*api.SequenceEvent, error) {
 }
 
 func (q *Queue) Close() error {
+	q.lock.Lock()
+	defer q.lock.Unlock()
 	q.cancel()
 	q.showDownWg.Wait()
 	for i, segment := range q.segments {
diff --git a/plugins/queue/mmap/queue_operation.go b/plugins/queue/mmap/queue_operation.go
index 64c5bba..9ee6e74 100644
--- a/plugins/queue/mmap/queue_operation.go
+++ b/plugins/queue/mmap/queue_operation.go
@@ -43,6 +43,8 @@ const uInt64Size = 8
 // enqueue writes the data into the file system. It first writes the length of the data,
 // then the data itself. It means the whole data may not exist in the one segments.
 func (q *Queue) enqueue(bytes []byte) error {
+	q.lock.Lock()
+	defer q.lock.Unlock()
 	if q.isFull() {
 		return api.ErrFull
 	}
@@ -67,6 +69,8 @@ func (q *Queue) enqueue(bytes []byte) error {
 // dequeue reads the data from the file system. It first reads the length of the data,
 // then the data itself. It means the whole data may not exist in the one segments.
 func (q *Queue) dequeue() (data []byte, rid, roffset int64, err error) {
+	q.lock.Lock()
+	defer q.lock.Unlock()
 	if q.isEmpty() {
 		return nil, 0, 0, api.ErrEmpty
 	}
diff --git a/plugins/queue/mmap/queue_test.go b/plugins/queue/mmap/queue_test.go
index b172ea0..a088bbd 100644
--- a/plugins/queue/mmap/queue_test.go
+++ b/plugins/queue/mmap/queue_test.go
@@ -48,6 +48,7 @@ func initMmapQueue(cfg plugin.Config) (*Queue, error) {
 		config[k] = v
 	}
 	q := api.GetQueue(config)
+	q.(*Queue).PipeName = "test-pipe"
 	if q == nil {
 		return nil, fmt.Errorf("cannot get a default config mmap queue from the registry")
 	}
@@ -58,7 +59,7 @@ func initMmapQueue(cfg plugin.Config) (*Queue, error) {
 }
 
 func cleanTestQueue(t *testing.T, q api.Queue) {
-	if err := os.RemoveAll(q.(*Queue).QueueDir); err != nil {
+	if err := os.RemoveAll(q.(*Queue).queueName); err != nil {
 		t.Errorf("cannot remove test queue dir, %v", err)
 	}
 }
@@ -148,9 +149,7 @@ func getLargeEvent(n int) *protocol.Event {
 }
 
 func TestQueue_Normal(t *testing.T) {
-	q, err := initMmapQueue(plugin.Config{
-		"queue_dir": "TestQueue_Normal",
-	})
+	q, err := initMmapQueue(plugin.Config{})
 	defer cleanTestQueue(t, q)
 	if err != nil {
 		t.Fatalf("error in initializing the mmap queue: %v", err)
@@ -173,7 +172,6 @@ func TestQueue_Normal(t *testing.T) {
 
 func TestQueue_ReadHistory(t *testing.T) {
 	cfg := plugin.Config{
-		"queue_dir":    "TestQueue_ReadHistory",
 		"segment_size": 10240,
 	}
 
@@ -236,7 +234,6 @@ func TestQueue_ReadHistory(t *testing.T) {
 
 func TestQueue_PushOverCeilingMsg(t *testing.T) {
 	cfg := plugin.Config{
-		"queue_dir":      "TestQueue_PushOverCeilingMsg",
 		"segment_size":   10240,
 		"max_event_size": 1024 * 8,
 	}
@@ -256,7 +253,6 @@ func TestQueue_PushOverCeilingMsg(t *testing.T) {
 
 func TestQueue_FlushWhenReachNum(t *testing.T) {
 	cfg := plugin.Config{
-		"queue_dir":         "TestQueue_FlushWhenReachNum",
 		"segment_size":      10240,
 		"flush_ceiling_num": 5,
 		"flush_period":      1000 * 60,
@@ -284,7 +280,6 @@ func TestQueue_FlushWhenReachNum(t *testing.T) {
 
 func TestQueue_FlushPeriod(t *testing.T) {
 	cfg := plugin.Config{
-		"queue_dir":         "TestQueue_FlushPeriod",
 		"segment_size":      10240,
 		"flush_ceiling_num": 50,
 		"flush_period":      1000 * 1,
@@ -312,7 +307,6 @@ func TestQueue_FlushPeriod(t *testing.T) {
 
 func TestQueue_MemCost(t *testing.T) {
 	cfg := plugin.Config{
-		"queue_dir":           "TestQueue_MemCost",
 		"segment_size":        1024 * 4,
 		"max_in_mem_segments": 8,
 	}
@@ -340,7 +334,6 @@ func TestQueue_MemCost(t *testing.T) {
 
 func TestQueue_OverSegmentEvent(t *testing.T) {
 	cfg := plugin.Config{
-		"queue_dir":    "TestQueue_OverSegmentEvent",
 		"segment_size": 1024 * 4,
 	}
 	q, err := initMmapQueue(cfg)
@@ -363,7 +356,6 @@ func TestQueue_OverSegmentEvent(t *testing.T) {
 
 func TestQueue_ReusingFiles(t *testing.T) {
 	cfg := plugin.Config{
-		"queue_dir":               "TestQueue_ReusingFiles",
 		"segment_size":            1024 * 4,
 		"queue_capacity_segments": 5,
 		"max_event_size":          1024 * 3,
@@ -394,7 +386,6 @@ func TestQueue_ReusingFiles(t *testing.T) {
 
 func TestQueue_Empty(t *testing.T) {
 	cfg := plugin.Config{
-		"queue_dir":               "TestQueue_ReusingFiles",
 		"segment_size":            1024 * 4,
 		"queue_capacity_segments": 10,
 	}
@@ -422,7 +413,6 @@ func TestQueue_Empty(t *testing.T) {
 
 func TestQueue_Full(t *testing.T) {
 	cfg := plugin.Config{
-		"queue_dir":               "TestQueue_ReusingFiles",
 		"segment_size":            1024 * 4,
 		"queue_capacity_segments": 10,
 	}
diff --git a/plugins/queue/mmap/segment_operation.go b/plugins/queue/mmap/segment_operation.go
index e80e120..42d54b2 100644
--- a/plugins/queue/mmap/segment_operation.go
+++ b/plugins/queue/mmap/segment_operation.go
@@ -35,7 +35,6 @@ import (
 
 // GetSegment returns a memory mapped file at the segmentID position.
 func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
-	index := q.GetIndex(segmentID)
 	if q.mmapCount >= q.MaxInMemSegments {
 		q.insufficientMemChannel <- struct{}{}
 		<-q.sufficientMemChannel
@@ -43,6 +42,7 @@ func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
 	if err := q.mapSegment(segmentID); err != nil {
 		return nil, err
 	}
+	index := q.GetIndex(segmentID)
 	if q.segments[index] != nil {
 		return q.segments[index], nil
 	}
@@ -55,7 +55,7 @@ func (q *Queue) mapSegment(segmentID int64) error {
 	if q.segments[index] != nil {
 		return nil
 	}
-	filePath := path.Join(q.QueueDir, strconv.Itoa(index)+segment.FileSuffix)
+	filePath := path.Join(q.queueName, strconv.Itoa(index)+segment.FileSuffix)
 	file, err := segment.NewSegment(filePath, q.SegmentSize)
 	if err != nil {
 		return err
@@ -113,6 +113,8 @@ func (q *Queue) doSwap() error {
 	logicWID := wID + int64(q.QueueCapacitySegments)
 	wIndex := q.GetIndex(wID)
 	rIndex := q.GetIndex(rID)
+	//  only clear all memory-mapped file when more than 1.5 times MaxInMemSegments.
+	clearAll := (wID - rID + 1) > int64(q.MaxInMemSegments)*3/2
 	for q.mmapCount >= q.MaxInMemSegments {
 		for i := logicWID - 1; i >= 0 && i >= logicWID-int64(q.MaxInMemSegments); i-- {
 			if q.GetIndex(i) == wIndex || q.GetIndex(i) == rIndex {
@@ -123,7 +125,7 @@ func (q *Queue) doSwap() error {
 			}
 			// the writing segment and the reading segment should still in memory.
 			// q.MaxInMemSegments/2-1 means keeping half available spaces to receive new data.
-			if q.MaxInMemSegments-q.mmapCount >= q.MaxInMemSegments/2-1 {
+			if !clearAll && q.MaxInMemSegments-q.mmapCount >= q.MaxInMemSegments/2-1 {
 				return nil
 			}
 		}
diff --git a/plugins/receiver/http/nativcelog/receiver.go b/plugins/receiver/http/nativcelog/receiver.go
index 7d78e51..9bb5055 100644
--- a/plugins/receiver/http/nativcelog/receiver.go
+++ b/plugins/receiver/http/nativcelog/receiver.go
@@ -36,8 +36,8 @@ import (
 )
 
 const (
-	Name      = "http-log-receiver"
-	eventName = "http-log-event"
+	Name      = "http-nativelog-receiver"
+	eventName = "http-nativelog-event"
 	success   = "success"
 	failing   = "failing"
 )
diff --git a/plugins/receiver/http/nativcelog/receiver_test.go b/plugins/receiver/http/nativcelog/receiver_test.go
index a1344d4..7790607 100644
--- a/plugins/receiver/http/nativcelog/receiver_test.go
+++ b/plugins/receiver/http/nativcelog/receiver_test.go
@@ -178,7 +178,7 @@ func initServer(cfg plugin.Config, t *testing.T) server.Server {
 }
 
 func initReceiver(cfg plugin.Config, t *testing.T) receiver.Receiver {
-	cfg[plugin.NameField] = "http-log-receiver"
+	cfg[plugin.NameField] = Name
 	q := receiver.GetReceiver(cfg)
 	if q == nil {
 		t.Fatalf("cannot get http-log-receiver from the registry")
diff --git a/plugins/server/grpc/server.go b/plugins/server/grpc/server.go
index c640399..0d1e1bd 100644
--- a/plugins/server/grpc/server.go
+++ b/plugins/server/grpc/server.go
@@ -90,8 +90,9 @@ func (s *Server) Prepare() error {
 
 func (s *Server) Start() error {
 	go func() {
+		log.Logger.WithField("address", s.Address).Info("grpc server is starting...")
 		if err := s.server.Serve(s.listener); err != nil {
-			log.Logger.Fatalf("failed to open a grpc serve: %v", err)
+			log.Logger.WithField("address", s.Address).Infof("grpc server has failure when starting: %v", err)
 		}
 	}()
 	return nil
diff --git a/plugins/server/http/server.go b/plugins/server/http/server.go
index d23de7f..c26d86e 100644
--- a/plugins/server/http/server.go
+++ b/plugins/server/http/server.go
@@ -53,10 +53,11 @@ func (s *Server) Prepare() error {
 }
 
 func (s *Server) Start() error {
+	log.Logger.WithField("address", s.Address).Info("http server is starting...")
 	go func() {
 		err := http.ListenAndServe(s.Address, s.Server)
 		if err != nil {
-			log.Logger.Errorf("start http server error: %v", err)
+			log.Logger.WithField("address", s.Address).Infof("http server has failure when starting: %v", err)
 		}
 	}()
 	return nil
diff --git a/plugins/server/prometheus/prometheus.go b/plugins/server/prometheus/prometheus.go
index 17b0543..fa4bc53 100644
--- a/plugins/server/prometheus/prometheus.go
+++ b/plugins/server/prometheus/prometheus.go
@@ -62,14 +62,15 @@ func (s *Server) Prepare() error {
 
 func (s *Server) Start() error {
 	// add go info metrics.
-	telemetry.Registerer.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
-	telemetry.Registerer.MustRegister(prometheus.NewGoCollector())
+	telemetry.Register(telemetry.WithMeta("processor_collector", prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})),
+		telemetry.WithMeta("go_collector", prometheus.NewGoCollector()))
 	// register prometheus metrics exporter handler.
 	s.server.Handle(s.Endpoint, promhttp.HandlerFor(telemetry.Gatherer, promhttp.HandlerOpts{ErrorLog: log.Logger}))
 	go func() {
+		log.Logger.WithField("address", s.Address).Info("prometheus server is starting...")
 		err := http.ListenAndServe(s.Address, s.server)
 		if err != nil {
-			log.Logger.Errorf("start prometheus http server error: %v", err)
+			log.Logger.WithField("address", s.Address).Infof("prometheus server has failure when starting: %v", err)
 		}
 	}()
 	return nil
diff --git a/tools/docker_build.sh b/tools/docker_build.sh
index c61033a..d6297d8 100644
--- a/tools/docker_build.sh
+++ b/tools/docker_build.sh
@@ -35,7 +35,9 @@ docker build --build-arg DIST_NAME="$DIST_NAME" -t skywalking-satellite:"$VERSIO
 
 if [ $? -eq 0 ]; then
  echo "skywalking-satellite:$VERSION docker images build success!"
+ rm -rf "$DOCKER_DIST_FILE"
 else
  echo "skywalking-satellite:$VERSION docker images build failure!"
+ rm -rf "$DOCKER_DIST_FILE"
  exit 1
 fi