You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2021/02/20 15:00:47 UTC

[skywalking-satellite] branch main updated: polish codes for 0.1.0 release (#29)

This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 917c09b  polish codes for 0.1.0 release (#29)
917c09b is described below

commit 917c09b6a930968c233d513f3d8673cd34546702
Author: Evan <31...@users.noreply.github.com>
AuthorDate: Sat Feb 20 23:00:41 2021 +0800

    polish codes for 0.1.0 release (#29)
    
    Co-authored-by: Evan <ev...@outlook.com>
---
 README.md                                          |   7 +-
 cmd/command.go                                     |  20 +-
 configs/satellite_config.yaml                      |   4 +-
 docs/en/guides/contribuation/How-to-release.md     | 229 +++++++++++++++++++++
 docs/en/setup/plugins/server_grpc-server.md        |   4 +-
 internal/satellite/boot/boot.go                    |   4 +-
 internal/satellite/module/api/module.go            |   5 +-
 .../satellite/module/gatherer/fetcher_gatherer.go  |   3 +-
 .../satellite/module/gatherer/receiver_gatherer.go |   3 +-
 internal/satellite/module/sender/sender.go         |  23 ++-
 plugins/client/kafka/client.go                     |   1 +
 plugins/queue/api/error.go                         |   5 +-
 plugins/queue/mmap/queue.go                        |  29 ++-
 plugins/server/grpc/server.go                      |   7 +-
 plugins/server/http/server.go                      |   1 +
 plugins/server/prometheus/prometheus.go            |   1 +
 16 files changed, 317 insertions(+), 29 deletions(-)

diff --git a/README.md b/README.md
index d2d28da..6915593 100644
--- a/README.md
+++ b/README.md
@@ -38,8 +38,11 @@ If you want to know more details about compiling, please read [the doc](./docs/e
 # Commands
 |  Commands| Flags   | Description  |
 |  ----  | ----  |----  |
-| start  | --config FILE | Start Satellite with the configuration FILE. (default: "configs/satellite_config.yaml" or read value from *SATELLITE_CONFIG* env).|
-| docs  | --output value | Generate Satellite plugin documentations to the output path. (default: "docs" or read value from *SATELLITE_DOC_PATH* env) |
+| start  | --config FILE, -c FILE | Load configuration from FILE. (default: "configs/satellite_config.yaml" or read value from *SATELLITE_CONFIG* env).|
+| start  | ---shutdown_hook_time TIME, -t TIME | The hook TIME for graceful shutdown, and the time unit is seconds. (default: "5" or read value from *SATELLITE_SHUTDOWN_HOOK_TIME* env).|
+| start  | --help, -h | Show help.|
+| docs  | --output PATH, -o PATH | The output PATH for the plugin documentation. (default: "docs" or read value from *SATELLITE_DOC_PATH* env) |
+| docs  | --help, -h | Show help.|
 
 
 # Contact Us
diff --git a/cmd/command.go b/cmd/command.go
index 2a96179..1be2ba2 100644
--- a/cmd/command.go
+++ b/cmd/command.go
@@ -18,6 +18,8 @@
 package main
 
 import (
+	"time"
+
 	"github.com/urfave/cli/v2"
 
 	"github.com/apache/skywalking-satellite/internal/satellite/boot"
@@ -31,16 +33,25 @@ var (
 		Usage: "start satellite",
 		Flags: []cli.Flag{
 			&cli.StringFlag{
-				Name:    "config, c",
+				Name:    "config",
+				Aliases: []string{"c"},
 				Usage:   "Load configuration from `FILE`",
 				EnvVars: []string{"SATELLITE_CONFIG"},
 				Value:   "configs/satellite_config.yaml",
 			},
+			&cli.StringFlag{
+				Name:    "shutdown_hook_time",
+				Aliases: []string{"t"},
+				Usage:   "The hook `TIME` for graceful shutdown, and the time unit is seconds.",
+				EnvVars: []string{"SATELLITE_SHUTDOWN_HOOK_TIME"},
+				Value:   "5",
+			},
 		},
 		Action: func(c *cli.Context) error {
 			configPath := c.String("config")
+			shutdownHookTime := time.Second * time.Duration(c.Int("shutdown_hook_time"))
 			cfg := config.Load(configPath)
-			return boot.Start(cfg)
+			return boot.Start(cfg, shutdownHookTime)
 		},
 	}
 
@@ -49,8 +60,9 @@ var (
 		Usage: "generate satellite plugin docs",
 		Flags: []cli.Flag{
 			&cli.StringFlag{
-				Name:    "output, o",
-				Usage:   "The output path for the plugin documentation",
+				Name:    "output",
+				Aliases: []string{"o"},
+				Usage:   "The output `PATH` for the plugin documentation",
 				EnvVars: []string{"SATELLITE_DOC_PATH"},
 				Value:   "docs",
 			},
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 19bf595..b79e614 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -83,9 +83,9 @@ pipes:
       # The time interval between two flush operations. And the time unit is millisecond.
       flush_time: ${SATELLITE_LOGPIPE_SENDER_FLUSH_TIME:1000}
       # The maximum buffer elements.
-      max_buffer_size: ${SATELLITE_PIPE1_SENDER_MAX_BUFFER_SIZE:200}
+      max_buffer_size: ${SATELLITE_LOGPIPE_SENDER_MAX_BUFFER_SIZE:200}
       # The minimum flush elements.
-      min_flush_events: ${SATELLITE_PIPE1_SENDER_MIN_FLUSH_EVENTS:100}
+      min_flush_events: ${SATELLITE_LOGPIPE_SENDER_MIN_FLUSH_EVENTS:100}
       client_name: kafka-client
       forwarders:
         - plugin_name: nativelog-kafka-forwarder
diff --git a/docs/en/guides/contribuation/How-to-release.md b/docs/en/guides/contribuation/How-to-release.md
new file mode 100644
index 0000000..354e522
--- /dev/null
+++ b/docs/en/guides/contribuation/How-to-release.md
@@ -0,0 +1,229 @@
+# Apache SkyWalking Satellite Release Guide
+
+This documentation guides the release manager to release the SkyWalking Satellite in the Apache Way, and also helps people to check the release for vote.
+
+## Prerequisites
+
+1. Close(if finished, or move to next milestone otherwise) all issues in the current milestone from [skywalking-satellite](https://github.com/apache/skywalking-satellite/milestones) and [skywalking](https://github.com/apache/skywalking/milestones), create a new milestone if needed.
+2. Update [CHANGES.md](../CHANGES.md).
+
+
+## Add your GPG public key to Apache svn
+
+1. Upload your GPG public key to a public GPG site, such as [MIT's site](http://pgp.mit.edu:11371/). 
+
+1. Log in [id.apache.org](https://id.apache.org/) and submit your key fingerprint.
+
+1. Add your GPG public key into [SkyWalking GPG KEYS](https://dist.apache.org/repos/dist/release/skywalking/KEYS) file, **you can do this only if you are a PMC member**.  You can ask a PMC member for help. **DO NOT override the existed `KEYS` file content, only append your key at the end of the file.**
+
+
+## Build and sign the source code package
+
+```shell
+export VERSION=<the version to release>
+git clone --recurse-submodules git@github.com:apache/skywalking-satellite && cd skywalking-satellite
+git tag -a "$VERSION" -m "Release Apache SkyWalking-Satellite $VERSION"
+git push --tags
+make clean && make release
+```
+
+**In total, six files should be automatically generated in the directory**: `skywalking-satellite-${VERSION}-bin.tgz`, `skywalking-satellite-${VERSION}-src.tgz`, and their corresponding `asc`, `sha512` files.
+
+## Upload to Apache svn
+
+```bash
+svn co https://dist.apache.org/repos/dist/dev/skywalking/
+mkdir -p skywalking/satellite/"$VERSION"
+cp skywalking-satellite/skywalking*.tgz skywalking/satellite/"$VERSION"
+cp skywalking-satellite/skywalking*.tgz.asc skywalking/satellite/"$VERSION"
+cp skywalking-satellite/skywalking-satellite*.tgz.sha512 skywalking/satellite/"$VERSION"
+
+cd skywalking/satellite && svn add "$VERSION" && svn commit -m "Draft Apache SkyWalking-Satellite release $VERSION"
+```
+
+## Make the internal announcement
+
+Send an announcement email to dev@ mailing list, **please check all links before sending the email**.
+
+```text
+Subject: [ANNOUNCEMENT] SkyWalking Satellite $VERSION test build available
+
+Content:
+
+The test build of SkyWalking Satellite $VERSION is now available.
+
+We welcome any comments you may have, and will take all feedback into
+account if a quality vote is called for this build.
+
+Release notes:
+
+ * https://github.com/apache/skywalking-satellite/blob/$VERSION/CHANGES.md
+
+Release Candidate:
+
+ * https://dist.apache.org/repos/dist/dev/skywalking/satellite/$VERSION
+ * sha512 checksums
+   - sha512xxxxyyyzzz apache-skywalking-satellite-bin-x.x.x.tgz
+   - sha512xxxxyyyzzz apache-skywalking-satellite-src-x.x.x.tgz
+
+Release Tag :
+
+ * (Git Tag) v$VERSION
+
+Release Commit Hash :
+
+ * https://github.com/apache/skywalking-satellite/tree/<Git Commit Hash>
+
+Keys to verify the Release Candidate :
+
+ * http://pgp.mit.edu:11371/pks/lookup?op=get&search=0x8BD99F552D9F33D7 corresponding to kezhenxu94@apache.org
+
+Guide to build the release from source :
+
+ * https://github.com/apache/skywalking-satellite/blob/$VERSION/docs/en/guides/contribuation/How-to-release.md
+
+A vote regarding the quality of this test build will be initiated
+within the next couple of days.
+```
+
+## Wait at least 48 hours for test responses
+
+Any PMC, committer or contributor can test features for releasing, and feedback.
+Based on that, PMC will decide whether to start a vote or not.
+
+## Call for vote in dev@ mailing list
+
+Call for vote in `dev@skywalking.apache.org`, **please check all links before sending the email**.
+
+```text
+Subject: [VOTE] Release Apache SkyWalking Satellite version $VERSION
+
+Content:
+
+Hi the SkyWalking Community:
+This is a call for vote to release Apache SkyWalking Satellite version $VERSION.
+
+Release notes:
+
+ * https://github.com/apache/skywalking-satellite/blob/$VERSION/CHANGES.md
+
+Release Candidate:
+
+ * https://dist.apache.org/repos/dist/dev/skywalking/satellite/$VERSION
+ * sha512 checksums
+   - sha512xxxxyyyzzz skywalking-satellite-x.x.x-src.tgz
+   - sha512xxxxyyyzzz skywalking-satellite-x.x.x-bin.tgz
+
+Release Tag :
+
+ * (Git Tag) v$VERSION
+
+Release Commit Hash :
+
+ * https://github.com/apache/skywalking-satellite/tree/<Git Commit Hash>
+
+Keys to verify the Release Candidate :
+
+ * https://dist.apache.org/repos/dist/release/skywalking/KEYS
+
+Guide to build the release from source :
+
+ * https://github.com/apache/skywalking-satellite/blob/$VERSION/docs/en/guides/contribuation/How-to-release.md
+
+Voting will start now and will remain open for at least 72 hours, all PMC members are required to give their votes.
+
+[ ] +1 Release this package.
+[ ] +0 No opinion.
+[ ] -1 Do not release this package because....
+
+Thanks.
+
+[1] https://github.com/apache/skywalking/blob/master/docs/en/guides/How-to-release.md#vote-check
+```
+
+## Vote Check
+
+All PMC members and committers should check these before voting +1:
+
+1. Features test.
+1. All artifacts in staging repository are published with `.asc`, `.md5`, and `sha` files.
+1. Source codes and distribution packages (`skywalking-satellite-$VERSION-{src,bin}.tgz`)
+are in `https://dist.apache.org/repos/dist/dev/skywalking/satellite/$VERSION` with `.asc`, `.sha512`.
+1. `LICENSE` and `NOTICE` are in source codes and distribution package.
+1. Check `shasum -c skywalking-satellite-$VERSION-{src,bin}.tgz.sha512`.
+1. Check `gpg --verify skywalking-satellite-$VERSION-{src,bin}.tgz.asc skywalking-satellite-$VERSION-{src,bin}.tgz`.
+1. Build distribution from source code package by following this [the build guide](#build-and-sign-the-source-code-package).
+1. Licenses check, `make license`.
+
+Vote result should follow these:
+
+1. PMC vote is +1 binding, all others is +1 no binding.
+
+1. Within 72 hours, you get at least 3 (+1 binding), and have more +1 than -1. Vote pass. 
+
+1. **Send the closing vote mail to announce the result**.  When count the binding and no binding votes, please list the names of voters. An example like this:
+
+   ```
+   [RESULT][VOTE] Release Apache SkyWalking Satellite version $VERSION
+   
+   3 days passed, we’ve got ($NUMBER) +1 bindings (and ... +1 non-bindings):
+   
+   (list names)
+   +1 bindings:
+   xxx
+   ...
+      
+   +1 non-bindings:
+   xxx
+   ...
+    
+   Thank you for voting, I’ll continue the release process.
+   ```
+
+## Publish release
+
+1. Move source codes tar balls and distributions to `https://dist.apache.org/repos/dist/release/skywalking/`, **you can do this only if you are a PMC member**.
+
+    ```shell
+    export SVN_EDITOR=vim
+    svn mv https://dist.apache.org/repos/dist/dev/skywalking/satellite/$VERSION https://dist.apache.org/repos/dist/release/skywalking/satellite
+    ```
+    
+1. Refer to the previous [PR](https://github.com/apache/skywalking-website/pull/212), update the event and download links on the website.
+
+1. Update [Github release page](https://github.com/apache/skywalking-satellite/releases), follow the previous convention.
+
+1. Send ANNOUNCE email to `dev@skywalking.apache.org` and `announce@apache.org`, the sender should use his/her Apache email account, **please check all links before sending the email**.
+
+    ```
+    Subject: [ANNOUNCEMENT] Apache SkyWalking Satellite $VERSION Released
+
+    Content:
+
+    Hi the SkyWalking Community
+
+    On behalf of the SkyWalking Team, I’m glad to announce that SkyWalking Satellite $VERSION is now released.
+
+    SkyWalking Satellite: A lightweight collector/sidecar could be deployed closing to the target monitored system, to collect metrics, traces, and logs.
+
+    SkyWalking: APM (application performance monitor) tool for distributed systems, especially designed for microservices, cloud native and container-based (Docker, Kubernetes, Mesos) architectures.
+
+    Download Links: http://skywalking.apache.org/downloads/
+
+    Release Notes : https://github.com/apache/skywalking-satellite/blob/$VERSION/CHANGES.md
+
+    Website: http://skywalking.apache.org/
+
+    SkyWalking Satellite Resources:
+    - Issue: https://github.com/apache/skywalking/issues
+    - Mailing list: dev@skywalking.apache.org
+    - Documents: https://github.com/apache/skywalking-satellite/blob/$VERSION/README.md
+    
+    The Apache SkyWalking Team
+    ```
+
+## Remove Unnecessary Releases
+
+Please remember to remove all unnecessary releases in the mirror svn (https://dist.apache.org/repos/dist/release/skywalking/), if you don't recommend users to choose those version.
+For example, you have removed the download and documentation links from the website. 
+If they want old ones, the Archive repository has all of them.
diff --git a/docs/en/setup/plugins/server_grpc-server.md b/docs/en/setup/plugins/server_grpc-server.md
index 5a43558..3681be4 100755
--- a/docs/en/setup/plugins/server_grpc-server.md
+++ b/docs/en/setup/plugins/server_grpc-server.md
@@ -12,7 +12,7 @@ max_recv_msg_size: 2097152
 # The max concurrent stream channels.
 max_concurrent_streams: 32
 # The TLS cert file path.
-tls_cert_file: 
+tls_cert_file: ""
 # The TLS key file path.
-tls_key_file: 
+tls_key_file: ""
 ```
diff --git a/internal/satellite/boot/boot.go b/internal/satellite/boot/boot.go
index 8e19e44..acd1adc 100644
--- a/internal/satellite/boot/boot.go
+++ b/internal/satellite/boot/boot.go
@@ -26,6 +26,7 @@ import (
 	"reflect"
 	"sync"
 	"syscall"
+	"time"
 
 	"github.com/sirupsen/logrus"
 
@@ -44,10 +45,11 @@ import (
 type ModuleContainer map[string][]api.Module
 
 // Start Satellite.
-func Start(cfg *config.SatelliteConfig) error {
+func Start(cfg *config.SatelliteConfig, shutdownHookTime time.Duration) error {
 	// Init the global components.
 	log.Init(cfg.Logger)
 	telemetry.Init(cfg.Telemetry)
+	api.ShutdownHookTime = shutdownHookTime
 	// register the supported plugin types to the registry
 	plugins.RegisterPlugins()
 	// use context to receive the external signal.
diff --git a/internal/satellite/module/api/module.go b/internal/satellite/module/api/module.go
index eee4dd5..4f8100f 100644
--- a/internal/satellite/module/api/module.go
+++ b/internal/satellite/module/api/module.go
@@ -19,9 +19,12 @@ package api
 
 import (
 	"context"
+	"time"
 )
 
-// TODO add metrics func
+// ShutdownHookTime is the global shutdown hook time.
+var ShutdownHookTime = time.Second * 5
+
 // Module id a custom plugin interface, which defines the processing.
 type Module interface {
 
diff --git a/internal/satellite/module/gatherer/fetcher_gatherer.go b/internal/satellite/module/gatherer/fetcher_gatherer.go
index 13ae0bf..873b527 100644
--- a/internal/satellite/module/gatherer/fetcher_gatherer.go
+++ b/internal/satellite/module/gatherer/fetcher_gatherer.go
@@ -24,6 +24,7 @@ import (
 
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	module "github.com/apache/skywalking-satellite/internal/satellite/module/api"
 	"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
 	"github.com/apache/skywalking-satellite/internal/satellite/telemetry"
 	fetcher "github.com/apache/skywalking-satellite/plugins/fetcher/api"
@@ -74,7 +75,6 @@ func (f *FetcherGatherer) Boot(ctx context.Context) {
 				}
 			case <-childCtx.Done():
 				cancel()
-				f.Shutdown()
 				return
 			}
 		}
@@ -107,6 +107,7 @@ func (f *FetcherGatherer) Boot(ctx context.Context) {
 
 func (f *FetcherGatherer) Shutdown() {
 	log.Logger.Infof("fetcher gatherer module of %s namespace is closing", f.config.PipeName)
+	time.Sleep(module.ShutdownHookTime)
 	if err := f.runningQueue.Close(); err != nil {
 		log.Logger.Errorf("failure occurs when closing %s queue  in %s namespace :%v", f.runningQueue.Name(), f.config.PipeName, err)
 	}
diff --git a/internal/satellite/module/gatherer/receiver_gatherer.go b/internal/satellite/module/gatherer/receiver_gatherer.go
index dcf56ec..84f3f76 100644
--- a/internal/satellite/module/gatherer/receiver_gatherer.go
+++ b/internal/satellite/module/gatherer/receiver_gatherer.go
@@ -26,6 +26,7 @@ import (
 
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	module "github.com/apache/skywalking-satellite/internal/satellite/module/api"
 	"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
 	"github.com/apache/skywalking-satellite/internal/satellite/telemetry"
 	queue "github.com/apache/skywalking-satellite/plugins/queue/api"
@@ -82,7 +83,6 @@ func (r *ReceiverGatherer) Boot(ctx context.Context) {
 				}
 			case <-childCtx.Done():
 				cancel()
-				r.Shutdown()
 				return
 			}
 		}
@@ -118,6 +118,7 @@ func (r *ReceiverGatherer) Boot(ctx context.Context) {
 
 func (r *ReceiverGatherer) Shutdown() {
 	log.Logger.WithField("pipe", r.config.PipeName).Infof("receiver gatherer module is closing...")
+	time.Sleep(module.ShutdownHookTime)
 	if err := r.runningQueue.Close(); err != nil {
 		log.Logger.WithFields(logrus.Fields{
 			"pipe":  r.config.PipeName,
diff --git a/internal/satellite/module/sender/sender.go b/internal/satellite/module/sender/sender.go
index d1f7f64..86eec6d 100644
--- a/internal/satellite/module/sender/sender.go
+++ b/internal/satellite/module/sender/sender.go
@@ -26,6 +26,7 @@ import (
 
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	module "github.com/apache/skywalking-satellite/internal/satellite/module/api"
 	"github.com/apache/skywalking-satellite/internal/satellite/module/buffer"
 	gatherer "github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
 	"github.com/apache/skywalking-satellite/internal/satellite/module/sender/api"
@@ -91,10 +92,10 @@ func (s *Sender) Boot(ctx context.Context) {
 			case status := <-s.listener:
 				switch status {
 				case client.Connected:
-					log.Logger.Infof("sender module of %s namespace is notified the connection connected", s.config.PipeName)
+					log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module is connected")
 					s.logicInput = s.physicalInput
 				case client.Disconnect:
-					log.Logger.Infof("sender module of %s namespace is notified the connection disconnected", s.config.PipeName)
+					log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module is disconnected")
 					s.logicInput = nil
 				}
 			case <-timeTicker.C:
@@ -136,16 +137,24 @@ func (s *Sender) Boot(ctx context.Context) {
 // Shutdown closes the channels and tries to force forward the events in the buffer.
 func (s *Sender) Shutdown() {
 	log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is closing")
-	close(s.logicInput)
-	for buf := range s.flushChannel {
-		s.consume(buf)
+	close(s.physicalInput)
+	ticker := time.NewTicker(module.ShutdownHookTime)
+	for {
+		select {
+		case <-ticker.C:
+			s.consume(s.buffer)
+			return
+		case b := <-s.flushChannel:
+			s.consume(b)
+		}
 	}
-	s.consume(s.buffer)
-	close(s.flushChannel)
 }
 
 // consume would forward the events by type and ack this batch.
 func (s *Sender) consume(batch *buffer.BatchBuffer) {
+	if batch.Len() == 0 {
+		return
+	}
 	log.Logger.WithFields(logrus.Fields{
 		"pipe":   s.config.PipeName,
 		"offset": batch.Last(),
diff --git a/plugins/client/kafka/client.go b/plugins/client/kafka/client.go
index 7f94432..63c0362 100644
--- a/plugins/client/kafka/client.go
+++ b/plugins/client/kafka/client.go
@@ -138,6 +138,7 @@ func (c *Client) Prepare() error {
 
 func (c *Client) Close() error {
 	c.cancel()
+	defer log.Logger.Info("kafka client is closed")
 	return c.client.Close()
 }
 
diff --git a/plugins/queue/api/error.go b/plugins/queue/api/error.go
index 05c7cec..ed6c830 100644
--- a/plugins/queue/api/error.go
+++ b/plugins/queue/api/error.go
@@ -20,6 +20,7 @@ package api
 import "errors"
 
 var (
-	ErrEmpty = errors.New("cannot read data when the queue is empty")
-	ErrFull  = errors.New("cannot write data when the queue is full")
+	ErrEmpty  = errors.New("cannot read data when the queue is empty")
+	ErrFull   = errors.New("cannot write data when the queue is full")
+	ErrClosed = errors.New("cannot enqueue or dequeue when the queue is closed")
 )
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
index 88b1f1e..44d20ee 100644
--- a/plugins/queue/mmap/queue.go
+++ b/plugins/queue/mmap/queue.go
@@ -29,7 +29,10 @@ import (
 	"syscall"
 	"time"
 
+	"github.com/sirupsen/logrus"
+
 	"github.com/grandecola/mmap"
+
 	"google.golang.org/protobuf/proto"
 
 	"github.com/apache/skywalking-satellite/internal/pkg/config"
@@ -67,7 +70,8 @@ type Queue struct {
 	flushChannel           chan struct{}  // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
 	insufficientMemChannel chan struct{}  // Notify when memory is insufficient
 	sufficientMemChannel   chan struct{}  // Notify when memory is sufficient
-	markReadChannel        chan int64
+	markReadChannel        chan int64     // Transfer the read segmentID to do ummap operation.
+	ready                  bool           // The status of the queue.
 
 	// control components
 	ctx        context.Context    // Parent ctx
@@ -143,10 +147,15 @@ func (q *Queue) Initialize() error {
 	q.showDownWg.Add(2)
 	go q.segmentSwapper()
 	go q.flush()
+	q.ready = true
 	return nil
 }
 
 func (q *Queue) Enqueue(e *protocol.Event) error {
+	if !q.ready {
+		log.Logger.WithField("pipe", q.CommonFields.PipeName).Warnf("the enqueue operation would be ignored because the queue was closed.")
+		return api.ErrClosed
+	}
 	data, err := proto.Marshal(e)
 	if err != nil {
 		return err
@@ -158,6 +167,10 @@ func (q *Queue) Enqueue(e *protocol.Event) error {
 }
 
 func (q *Queue) Dequeue() (*api.SequenceEvent, error) {
+	if !q.ready {
+		log.Logger.WithField("pipe", q.CommonFields.PipeName).Warnf("the dequeue operation would be ignored because the queue was closed.")
+		return nil, api.ErrClosed
+	}
 	data, id, offset, err := q.dequeue()
 	if err != nil {
 		return nil, err
@@ -180,8 +193,10 @@ func (q *Queue) Close() error {
 	q.showDownWg.Wait()
 	for i, segment := range q.segments {
 		if segment != nil {
-			err := segment.Unmap()
-			if err != nil {
+			if err := segment.Flush(syscall.MS_SYNC); err != nil {
+				log.Logger.Errorf("cannot unmap the segments: %d, %v", i, err)
+			}
+			if err := segment.Unmap(); err != nil {
 				log.Logger.Errorf("cannot unmap the segments: %d, %v", i, err)
 			}
 		}
@@ -189,10 +204,18 @@ func (q *Queue) Close() error {
 	if err := q.meta.Close(); err != nil {
 		log.Logger.Errorf("cannot unmap the metadata: %v", err)
 	}
+	q.ready = false
 	return nil
 }
 
 func (q *Queue) Ack(lastOffset event.Offset) {
+	if !q.ready {
+		log.Logger.WithFields(logrus.Fields{
+			"pipe":   q.CommonFields.PipeName,
+			"offset": lastOffset,
+		}).Warnf("the ack operation would be ignored because the queue was closed.")
+		return
+	}
 	id, offset, err := q.decodeOffset(lastOffset)
 	if err != nil {
 		log.Logger.Errorf("cannot ack queue with the offset:%s", lastOffset)
diff --git a/plugins/server/grpc/server.go b/plugins/server/grpc/server.go
index 0d1e1bd..3351823 100644
--- a/plugins/server/grpc/server.go
+++ b/plugins/server/grpc/server.go
@@ -61,9 +61,9 @@ max_recv_msg_size: 2097152
 # The max concurrent stream channels.
 max_concurrent_streams: 32
 # The TLS cert file path.
-tls_cert_file: 
+tls_cert_file: ""
 # The TLS key file path.
-tls_key_file: 
+tls_key_file: ""
 `
 }
 
@@ -99,7 +99,8 @@ func (s *Server) Start() error {
 }
 
 func (s *Server) Close() error {
-	s.server.GracefulStop()
+	s.server.Stop()
+	log.Logger.Info("grpc server is closed")
 	return nil
 }
 
diff --git a/plugins/server/http/server.go b/plugins/server/http/server.go
index c26d86e..7c7810d 100644
--- a/plugins/server/http/server.go
+++ b/plugins/server/http/server.go
@@ -64,6 +64,7 @@ func (s *Server) Start() error {
 }
 
 func (s *Server) Close() error {
+	log.Logger.Info("http server is closed")
 	return nil
 }
 
diff --git a/plugins/server/prometheus/prometheus.go b/plugins/server/prometheus/prometheus.go
index fa4bc53..8df5e77 100644
--- a/plugins/server/prometheus/prometheus.go
+++ b/plugins/server/prometheus/prometheus.go
@@ -77,6 +77,7 @@ func (s *Server) Start() error {
 }
 
 func (s *Server) Close() error {
+	log.Logger.Info("prometheus server is closed")
 	return nil
 }