You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2021/02/20 15:00:47 UTC
[skywalking-satellite] branch main updated: polish codes for 0.1.0
release (#29)
This is an automated email from the ASF dual-hosted git repository.
liujiapeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new 917c09b polish codes for 0.1.0 release (#29)
917c09b is described below
commit 917c09b6a930968c233d513f3d8673cd34546702
Author: Evan <31...@users.noreply.github.com>
AuthorDate: Sat Feb 20 23:00:41 2021 +0800
polish codes for 0.1.0 release (#29)
Co-authored-by: Evan <ev...@outlook.com>
---
README.md | 7 +-
cmd/command.go | 20 +-
configs/satellite_config.yaml | 4 +-
docs/en/guides/contribuation/How-to-release.md | 229 +++++++++++++++++++++
docs/en/setup/plugins/server_grpc-server.md | 4 +-
internal/satellite/boot/boot.go | 4 +-
internal/satellite/module/api/module.go | 5 +-
.../satellite/module/gatherer/fetcher_gatherer.go | 3 +-
.../satellite/module/gatherer/receiver_gatherer.go | 3 +-
internal/satellite/module/sender/sender.go | 23 ++-
plugins/client/kafka/client.go | 1 +
plugins/queue/api/error.go | 5 +-
plugins/queue/mmap/queue.go | 29 ++-
plugins/server/grpc/server.go | 7 +-
plugins/server/http/server.go | 1 +
plugins/server/prometheus/prometheus.go | 1 +
16 files changed, 317 insertions(+), 29 deletions(-)
diff --git a/README.md b/README.md
index d2d28da..6915593 100644
--- a/README.md
+++ b/README.md
@@ -38,8 +38,11 @@ If you want to know more details about compiling, please read [the doc](./docs/e
# Commands
| Commands| Flags | Description |
| ---- | ---- |---- |
-| start | --config FILE | Start Satellite with the configuration FILE. (default: "configs/satellite_config.yaml" or read value from *SATELLITE_CONFIG* env).|
-| docs | --output value | Generate Satellite plugin documentations to the output path. (default: "docs" or read value from *SATELLITE_DOC_PATH* env) |
+| start | --config FILE, -c FILE | Load configuration from FILE. (default: "configs/satellite_config.yaml" or read value from *SATELLITE_CONFIG* env).|
+| start | ---shutdown_hook_time TIME, -t TIME | The hook TIME for graceful shutdown, and the time unit is seconds. (default: "5" or read value from *SATELLITE_SHUTDOWN_HOOK_TIME* env).|
+| start | --help, -h | Show help.|
+| docs | --output PATH, -o PATH | The output PATH for the plugin documentation. (default: "docs" or read value from *SATELLITE_DOC_PATH* env) |
+| docs | --help, -h | Show help.|
# Contact Us
diff --git a/cmd/command.go b/cmd/command.go
index 2a96179..1be2ba2 100644
--- a/cmd/command.go
+++ b/cmd/command.go
@@ -18,6 +18,8 @@
package main
import (
+ "time"
+
"github.com/urfave/cli/v2"
"github.com/apache/skywalking-satellite/internal/satellite/boot"
@@ -31,16 +33,25 @@ var (
Usage: "start satellite",
Flags: []cli.Flag{
&cli.StringFlag{
- Name: "config, c",
+ Name: "config",
+ Aliases: []string{"c"},
Usage: "Load configuration from `FILE`",
EnvVars: []string{"SATELLITE_CONFIG"},
Value: "configs/satellite_config.yaml",
},
+ &cli.StringFlag{
+ Name: "shutdown_hook_time",
+ Aliases: []string{"t"},
+ Usage: "The hook `TIME` for graceful shutdown, and the time unit is seconds.",
+ EnvVars: []string{"SATELLITE_SHUTDOWN_HOOK_TIME"},
+ Value: "5",
+ },
},
Action: func(c *cli.Context) error {
configPath := c.String("config")
+ shutdownHookTime := time.Second * time.Duration(c.Int("shutdown_hook_time"))
cfg := config.Load(configPath)
- return boot.Start(cfg)
+ return boot.Start(cfg, shutdownHookTime)
},
}
@@ -49,8 +60,9 @@ var (
Usage: "generate satellite plugin docs",
Flags: []cli.Flag{
&cli.StringFlag{
- Name: "output, o",
- Usage: "The output path for the plugin documentation",
+ Name: "output",
+ Aliases: []string{"o"},
+ Usage: "The output `PATH` for the plugin documentation",
EnvVars: []string{"SATELLITE_DOC_PATH"},
Value: "docs",
},
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index 19bf595..b79e614 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -83,9 +83,9 @@ pipes:
# The time interval between two flush operations. And the time unit is millisecond.
flush_time: ${SATELLITE_LOGPIPE_SENDER_FLUSH_TIME:1000}
# The maximum buffer elements.
- max_buffer_size: ${SATELLITE_PIPE1_SENDER_MAX_BUFFER_SIZE:200}
+ max_buffer_size: ${SATELLITE_LOGPIPE_SENDER_MAX_BUFFER_SIZE:200}
# The minimum flush elements.
- min_flush_events: ${SATELLITE_PIPE1_SENDER_MIN_FLUSH_EVENTS:100}
+ min_flush_events: ${SATELLITE_LOGPIPE_SENDER_MIN_FLUSH_EVENTS:100}
client_name: kafka-client
forwarders:
- plugin_name: nativelog-kafka-forwarder
diff --git a/docs/en/guides/contribuation/How-to-release.md b/docs/en/guides/contribuation/How-to-release.md
new file mode 100644
index 0000000..354e522
--- /dev/null
+++ b/docs/en/guides/contribuation/How-to-release.md
@@ -0,0 +1,229 @@
+# Apache SkyWalking Satellite Release Guide
+
+This documentation guides the release manager to release the SkyWalking Satellite in the Apache Way, and also helps people to check the release for vote.
+
+## Prerequisites
+
+1. Close(if finished, or move to next milestone otherwise) all issues in the current milestone from [skywalking-satellite](https://github.com/apache/skywalking-satellite/milestones) and [skywalking](https://github.com/apache/skywalking/milestones), create a new milestone if needed.
+2. Update [CHANGES.md](../CHANGES.md).
+
+
+## Add your GPG public key to Apache svn
+
+1. Upload your GPG public key to a public GPG site, such as [MIT's site](http://pgp.mit.edu:11371/).
+
+1. Log in [id.apache.org](https://id.apache.org/) and submit your key fingerprint.
+
+1. Add your GPG public key into [SkyWalking GPG KEYS](https://dist.apache.org/repos/dist/release/skywalking/KEYS) file, **you can do this only if you are a PMC member**. You can ask a PMC member for help. **DO NOT override the existed `KEYS` file content, only append your key at the end of the file.**
+
+
+## Build and sign the source code package
+
+```shell
+export VERSION=<the version to release>
+git clone --recurse-submodules git@github.com:apache/skywalking-satellite && cd skywalking-satellite
+git tag -a "$VERSION" -m "Release Apache SkyWalking-Satellite $VERSION"
+git push --tags
+make clean && make release
+```
+
+**In total, six files should be automatically generated in the directory**: `skywalking-satellite-${VERSION}-bin.tgz`, `skywalking-satellite-${VERSION}-src.tgz`, and their corresponding `asc`, `sha512` files.
+
+## Upload to Apache svn
+
+```bash
+svn co https://dist.apache.org/repos/dist/dev/skywalking/
+mkdir -p skywalking/satellite/"$VERSION"
+cp skywalking-satellite/skywalking*.tgz skywalking/satellite/"$VERSION"
+cp skywalking-satellite/skywalking*.tgz.asc skywalking/satellite/"$VERSION"
+cp skywalking-satellite/skywalking-satellite*.tgz.sha512 skywalking/satellite/"$VERSION"
+
+cd skywalking/satellite && svn add "$VERSION" && svn commit -m "Draft Apache SkyWalking-Satellite release $VERSION"
+```
+
+## Make the internal announcement
+
+Send an announcement email to dev@ mailing list, **please check all links before sending the email**.
+
+```text
+Subject: [ANNOUNCEMENT] SkyWalking Satellite $VERSION test build available
+
+Content:
+
+The test build of SkyWalking Satellite $VERSION is now available.
+
+We welcome any comments you may have, and will take all feedback into
+account if a quality vote is called for this build.
+
+Release notes:
+
+ * https://github.com/apache/skywalking-satellite/blob/$VERSION/CHANGES.md
+
+Release Candidate:
+
+ * https://dist.apache.org/repos/dist/dev/skywalking/satellite/$VERSION
+ * sha512 checksums
+ - sha512xxxxyyyzzz apache-skywalking-satellite-bin-x.x.x.tgz
+ - sha512xxxxyyyzzz apache-skywalking-satellite-src-x.x.x.tgz
+
+Release Tag :
+
+ * (Git Tag) v$VERSION
+
+Release Commit Hash :
+
+ * https://github.com/apache/skywalking-satellite/tree/<Git Commit Hash>
+
+Keys to verify the Release Candidate :
+
+ * http://pgp.mit.edu:11371/pks/lookup?op=get&search=0x8BD99F552D9F33D7 corresponding to kezhenxu94@apache.org
+
+Guide to build the release from source :
+
+ * https://github.com/apache/skywalking-satellite/blob/$VERSION/docs/en/guides/contribuation/How-to-release.md
+
+A vote regarding the quality of this test build will be initiated
+within the next couple of days.
+```
+
+## Wait at least 48 hours for test responses
+
+Any PMC, committer or contributor can test features for releasing, and feedback.
+Based on that, PMC will decide whether to start a vote or not.
+
+## Call for vote in dev@ mailing list
+
+Call for vote in `dev@skywalking.apache.org`, **please check all links before sending the email**.
+
+```text
+Subject: [VOTE] Release Apache SkyWalking Satellite version $VERSION
+
+Content:
+
+Hi the SkyWalking Community:
+This is a call for vote to release Apache SkyWalking Satellite version $VERSION.
+
+Release notes:
+
+ * https://github.com/apache/skywalking-satellite/blob/$VERSION/CHANGES.md
+
+Release Candidate:
+
+ * https://dist.apache.org/repos/dist/dev/skywalking/satellite/$VERSION
+ * sha512 checksums
+ - sha512xxxxyyyzzz skywalking-satellite-x.x.x-src.tgz
+ - sha512xxxxyyyzzz skywalking-satellite-x.x.x-bin.tgz
+
+Release Tag :
+
+ * (Git Tag) v$VERSION
+
+Release Commit Hash :
+
+ * https://github.com/apache/skywalking-satellite/tree/<Git Commit Hash>
+
+Keys to verify the Release Candidate :
+
+ * https://dist.apache.org/repos/dist/release/skywalking/KEYS
+
+Guide to build the release from source :
+
+ * https://github.com/apache/skywalking-satellite/blob/$VERSION/docs/en/guides/contribuation/How-to-release.md
+
+Voting will start now and will remain open for at least 72 hours, all PMC members are required to give their votes.
+
+[ ] +1 Release this package.
+[ ] +0 No opinion.
+[ ] -1 Do not release this package because....
+
+Thanks.
+
+[1] https://github.com/apache/skywalking/blob/master/docs/en/guides/How-to-release.md#vote-check
+```
+
+## Vote Check
+
+All PMC members and committers should check these before voting +1:
+
+1. Features test.
+1. All artifacts in staging repository are published with `.asc`, `.md5`, and `sha` files.
+1. Source codes and distribution packages (`skywalking-satellite-$VERSION-{src,bin}.tgz`)
+are in `https://dist.apache.org/repos/dist/dev/skywalking/satellite/$VERSION` with `.asc`, `.sha512`.
+1. `LICENSE` and `NOTICE` are in source codes and distribution package.
+1. Check `shasum -c skywalking-satellite-$VERSION-{src,bin}.tgz.sha512`.
+1. Check `gpg --verify skywalking-satellite-$VERSION-{src,bin}.tgz.asc skywalking-satellite-$VERSION-{src,bin}.tgz`.
+1. Build distribution from source code package by following this [the build guide](#build-and-sign-the-source-code-package).
+1. Licenses check, `make license`.
+
+Vote result should follow these:
+
+1. PMC vote is +1 binding, all others is +1 no binding.
+
+1. Within 72 hours, you get at least 3 (+1 binding), and have more +1 than -1. Vote pass.
+
+1. **Send the closing vote mail to announce the result**. When count the binding and no binding votes, please list the names of voters. An example like this:
+
+ ```
+ [RESULT][VOTE] Release Apache SkyWalking Satellite version $VERSION
+
+ 3 days passed, we’ve got ($NUMBER) +1 bindings (and ... +1 non-bindings):
+
+ (list names)
+ +1 bindings:
+ xxx
+ ...
+
+ +1 non-bindings:
+ xxx
+ ...
+
+ Thank you for voting, I’ll continue the release process.
+ ```
+
+## Publish release
+
+1. Move source codes tar balls and distributions to `https://dist.apache.org/repos/dist/release/skywalking/`, **you can do this only if you are a PMC member**.
+
+ ```shell
+ export SVN_EDITOR=vim
+ svn mv https://dist.apache.org/repos/dist/dev/skywalking/satellite/$VERSION https://dist.apache.org/repos/dist/release/skywalking/satellite
+ ```
+
+1. Refer to the previous [PR](https://github.com/apache/skywalking-website/pull/212), update the event and download links on the website.
+
+1. Update [Github release page](https://github.com/apache/skywalking-satellite/releases), follow the previous convention.
+
+1. Send ANNOUNCE email to `dev@skywalking.apache.org` and `announce@apache.org`, the sender should use his/her Apache email account, **please check all links before sending the email**.
+
+ ```
+ Subject: [ANNOUNCEMENT] Apache SkyWalking Satellite $VERSION Released
+
+ Content:
+
+ Hi the SkyWalking Community
+
+ On behalf of the SkyWalking Team, I’m glad to announce that SkyWalking Satellite $VERSION is now released.
+
+ SkyWalking Satellite: A lightweight collector/sidecar could be deployed closing to the target monitored system, to collect metrics, traces, and logs.
+
+ SkyWalking: APM (application performance monitor) tool for distributed systems, especially designed for microservices, cloud native and container-based (Docker, Kubernetes, Mesos) architectures.
+
+ Download Links: http://skywalking.apache.org/downloads/
+
+ Release Notes : https://github.com/apache/skywalking-satellite/blob/$VERSION/CHANGES.md
+
+ Website: http://skywalking.apache.org/
+
+ SkyWalking Satellite Resources:
+ - Issue: https://github.com/apache/skywalking/issues
+ - Mailing list: dev@skywalking.apache.org
+ - Documents: https://github.com/apache/skywalking-satellite/blob/$VERSION/README.md
+
+ The Apache SkyWalking Team
+ ```
+
+## Remove Unnecessary Releases
+
+Please remember to remove all unnecessary releases in the mirror svn (https://dist.apache.org/repos/dist/release/skywalking/), if you don't recommend users to choose those version.
+For example, you have removed the download and documentation links from the website.
+If they want old ones, the Archive repository has all of them.
diff --git a/docs/en/setup/plugins/server_grpc-server.md b/docs/en/setup/plugins/server_grpc-server.md
index 5a43558..3681be4 100755
--- a/docs/en/setup/plugins/server_grpc-server.md
+++ b/docs/en/setup/plugins/server_grpc-server.md
@@ -12,7 +12,7 @@ max_recv_msg_size: 2097152
# The max concurrent stream channels.
max_concurrent_streams: 32
# The TLS cert file path.
-tls_cert_file:
+tls_cert_file: ""
# The TLS key file path.
-tls_key_file:
+tls_key_file: ""
```
diff --git a/internal/satellite/boot/boot.go b/internal/satellite/boot/boot.go
index 8e19e44..acd1adc 100644
--- a/internal/satellite/boot/boot.go
+++ b/internal/satellite/boot/boot.go
@@ -26,6 +26,7 @@ import (
"reflect"
"sync"
"syscall"
+ "time"
"github.com/sirupsen/logrus"
@@ -44,10 +45,11 @@ import (
type ModuleContainer map[string][]api.Module
// Start Satellite.
-func Start(cfg *config.SatelliteConfig) error {
+func Start(cfg *config.SatelliteConfig, shutdownHookTime time.Duration) error {
// Init the global components.
log.Init(cfg.Logger)
telemetry.Init(cfg.Telemetry)
+ api.ShutdownHookTime = shutdownHookTime
// register the supported plugin types to the registry
plugins.RegisterPlugins()
// use context to receive the external signal.
diff --git a/internal/satellite/module/api/module.go b/internal/satellite/module/api/module.go
index eee4dd5..4f8100f 100644
--- a/internal/satellite/module/api/module.go
+++ b/internal/satellite/module/api/module.go
@@ -19,9 +19,12 @@ package api
import (
"context"
+ "time"
)
-// TODO add metrics func
+// ShutdownHookTime is the global shutdown hook time.
+var ShutdownHookTime = time.Second * 5
+
// Module id a custom plugin interface, which defines the processing.
type Module interface {
diff --git a/internal/satellite/module/gatherer/fetcher_gatherer.go b/internal/satellite/module/gatherer/fetcher_gatherer.go
index 13ae0bf..873b527 100644
--- a/internal/satellite/module/gatherer/fetcher_gatherer.go
+++ b/internal/satellite/module/gatherer/fetcher_gatherer.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
+ module "github.com/apache/skywalking-satellite/internal/satellite/module/api"
"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
"github.com/apache/skywalking-satellite/internal/satellite/telemetry"
fetcher "github.com/apache/skywalking-satellite/plugins/fetcher/api"
@@ -74,7 +75,6 @@ func (f *FetcherGatherer) Boot(ctx context.Context) {
}
case <-childCtx.Done():
cancel()
- f.Shutdown()
return
}
}
@@ -107,6 +107,7 @@ func (f *FetcherGatherer) Boot(ctx context.Context) {
func (f *FetcherGatherer) Shutdown() {
log.Logger.Infof("fetcher gatherer module of %s namespace is closing", f.config.PipeName)
+ time.Sleep(module.ShutdownHookTime)
if err := f.runningQueue.Close(); err != nil {
log.Logger.Errorf("failure occurs when closing %s queue in %s namespace :%v", f.runningQueue.Name(), f.config.PipeName, err)
}
diff --git a/internal/satellite/module/gatherer/receiver_gatherer.go b/internal/satellite/module/gatherer/receiver_gatherer.go
index dcf56ec..84f3f76 100644
--- a/internal/satellite/module/gatherer/receiver_gatherer.go
+++ b/internal/satellite/module/gatherer/receiver_gatherer.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
+ module "github.com/apache/skywalking-satellite/internal/satellite/module/api"
"github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
"github.com/apache/skywalking-satellite/internal/satellite/telemetry"
queue "github.com/apache/skywalking-satellite/plugins/queue/api"
@@ -82,7 +83,6 @@ func (r *ReceiverGatherer) Boot(ctx context.Context) {
}
case <-childCtx.Done():
cancel()
- r.Shutdown()
return
}
}
@@ -118,6 +118,7 @@ func (r *ReceiverGatherer) Boot(ctx context.Context) {
func (r *ReceiverGatherer) Shutdown() {
log.Logger.WithField("pipe", r.config.PipeName).Infof("receiver gatherer module is closing...")
+ time.Sleep(module.ShutdownHookTime)
if err := r.runningQueue.Close(); err != nil {
log.Logger.WithFields(logrus.Fields{
"pipe": r.config.PipeName,
diff --git a/internal/satellite/module/sender/sender.go b/internal/satellite/module/sender/sender.go
index d1f7f64..86eec6d 100644
--- a/internal/satellite/module/sender/sender.go
+++ b/internal/satellite/module/sender/sender.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
+ module "github.com/apache/skywalking-satellite/internal/satellite/module/api"
"github.com/apache/skywalking-satellite/internal/satellite/module/buffer"
gatherer "github.com/apache/skywalking-satellite/internal/satellite/module/gatherer/api"
"github.com/apache/skywalking-satellite/internal/satellite/module/sender/api"
@@ -91,10 +92,10 @@ func (s *Sender) Boot(ctx context.Context) {
case status := <-s.listener:
switch status {
case client.Connected:
- log.Logger.Infof("sender module of %s namespace is notified the connection connected", s.config.PipeName)
+ log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module is connected")
s.logicInput = s.physicalInput
case client.Disconnect:
- log.Logger.Infof("sender module of %s namespace is notified the connection disconnected", s.config.PipeName)
+ log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module is disconnected")
s.logicInput = nil
}
case <-timeTicker.C:
@@ -136,16 +137,24 @@ func (s *Sender) Boot(ctx context.Context) {
// Shutdown closes the channels and tries to force forward the events in the buffer.
func (s *Sender) Shutdown() {
log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is closing")
- close(s.logicInput)
- for buf := range s.flushChannel {
- s.consume(buf)
+ close(s.physicalInput)
+ ticker := time.NewTicker(module.ShutdownHookTime)
+ for {
+ select {
+ case <-ticker.C:
+ s.consume(s.buffer)
+ return
+ case b := <-s.flushChannel:
+ s.consume(b)
+ }
}
- s.consume(s.buffer)
- close(s.flushChannel)
}
// consume would forward the events by type and ack this batch.
func (s *Sender) consume(batch *buffer.BatchBuffer) {
+ if batch.Len() == 0 {
+ return
+ }
log.Logger.WithFields(logrus.Fields{
"pipe": s.config.PipeName,
"offset": batch.Last(),
diff --git a/plugins/client/kafka/client.go b/plugins/client/kafka/client.go
index 7f94432..63c0362 100644
--- a/plugins/client/kafka/client.go
+++ b/plugins/client/kafka/client.go
@@ -138,6 +138,7 @@ func (c *Client) Prepare() error {
func (c *Client) Close() error {
c.cancel()
+ defer log.Logger.Info("kafka client is closed")
return c.client.Close()
}
diff --git a/plugins/queue/api/error.go b/plugins/queue/api/error.go
index 05c7cec..ed6c830 100644
--- a/plugins/queue/api/error.go
+++ b/plugins/queue/api/error.go
@@ -20,6 +20,7 @@ package api
import "errors"
var (
- ErrEmpty = errors.New("cannot read data when the queue is empty")
- ErrFull = errors.New("cannot write data when the queue is full")
+ ErrEmpty = errors.New("cannot read data when the queue is empty")
+ ErrFull = errors.New("cannot write data when the queue is full")
+ ErrClosed = errors.New("cannot enqueue or dequeue when the queue is closed")
)
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
index 88b1f1e..44d20ee 100644
--- a/plugins/queue/mmap/queue.go
+++ b/plugins/queue/mmap/queue.go
@@ -29,7 +29,10 @@ import (
"syscall"
"time"
+ "github.com/sirupsen/logrus"
+
"github.com/grandecola/mmap"
+
"google.golang.org/protobuf/proto"
"github.com/apache/skywalking-satellite/internal/pkg/config"
@@ -67,7 +70,8 @@ type Queue struct {
flushChannel chan struct{} // The flushChannel channel would receive a signal when the unflushedNum reach the flush_ceiling_num.
insufficientMemChannel chan struct{} // Notify when memory is insufficient
sufficientMemChannel chan struct{} // Notify when memory is sufficient
- markReadChannel chan int64
+ markReadChannel chan int64 // Transfer the read segmentID to do ummap operation.
+ ready bool // The status of the queue.
// control components
ctx context.Context // Parent ctx
@@ -143,10 +147,15 @@ func (q *Queue) Initialize() error {
q.showDownWg.Add(2)
go q.segmentSwapper()
go q.flush()
+ q.ready = true
return nil
}
func (q *Queue) Enqueue(e *protocol.Event) error {
+ if !q.ready {
+ log.Logger.WithField("pipe", q.CommonFields.PipeName).Warnf("the enqueue operation would be ignored because the queue was closed.")
+ return api.ErrClosed
+ }
data, err := proto.Marshal(e)
if err != nil {
return err
@@ -158,6 +167,10 @@ func (q *Queue) Enqueue(e *protocol.Event) error {
}
func (q *Queue) Dequeue() (*api.SequenceEvent, error) {
+ if !q.ready {
+ log.Logger.WithField("pipe", q.CommonFields.PipeName).Warnf("the dequeue operation would be ignored because the queue was closed.")
+ return nil, api.ErrClosed
+ }
data, id, offset, err := q.dequeue()
if err != nil {
return nil, err
@@ -180,8 +193,10 @@ func (q *Queue) Close() error {
q.showDownWg.Wait()
for i, segment := range q.segments {
if segment != nil {
- err := segment.Unmap()
- if err != nil {
+ if err := segment.Flush(syscall.MS_SYNC); err != nil {
+ log.Logger.Errorf("cannot unmap the segments: %d, %v", i, err)
+ }
+ if err := segment.Unmap(); err != nil {
log.Logger.Errorf("cannot unmap the segments: %d, %v", i, err)
}
}
@@ -189,10 +204,18 @@ func (q *Queue) Close() error {
if err := q.meta.Close(); err != nil {
log.Logger.Errorf("cannot unmap the metadata: %v", err)
}
+ q.ready = false
return nil
}
func (q *Queue) Ack(lastOffset event.Offset) {
+ if !q.ready {
+ log.Logger.WithFields(logrus.Fields{
+ "pipe": q.CommonFields.PipeName,
+ "offset": lastOffset,
+ }).Warnf("the ack operation would be ignored because the queue was closed.")
+ return
+ }
id, offset, err := q.decodeOffset(lastOffset)
if err != nil {
log.Logger.Errorf("cannot ack queue with the offset:%s", lastOffset)
diff --git a/plugins/server/grpc/server.go b/plugins/server/grpc/server.go
index 0d1e1bd..3351823 100644
--- a/plugins/server/grpc/server.go
+++ b/plugins/server/grpc/server.go
@@ -61,9 +61,9 @@ max_recv_msg_size: 2097152
# The max concurrent stream channels.
max_concurrent_streams: 32
# The TLS cert file path.
-tls_cert_file:
+tls_cert_file: ""
# The TLS key file path.
-tls_key_file:
+tls_key_file: ""
`
}
@@ -99,7 +99,8 @@ func (s *Server) Start() error {
}
func (s *Server) Close() error {
- s.server.GracefulStop()
+ s.server.Stop()
+ log.Logger.Info("grpc server is closed")
return nil
}
diff --git a/plugins/server/http/server.go b/plugins/server/http/server.go
index c26d86e..7c7810d 100644
--- a/plugins/server/http/server.go
+++ b/plugins/server/http/server.go
@@ -64,6 +64,7 @@ func (s *Server) Start() error {
}
func (s *Server) Close() error {
+ log.Logger.Info("http server is closed")
return nil
}
diff --git a/plugins/server/prometheus/prometheus.go b/plugins/server/prometheus/prometheus.go
index fa4bc53..8df5e77 100644
--- a/plugins/server/prometheus/prometheus.go
+++ b/plugins/server/prometheus/prometheus.go
@@ -77,6 +77,7 @@ func (s *Server) Start() error {
}
func (s *Server) Close() error {
+ log.Logger.Info("prometheus server is closed")
return nil
}