You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/04/14 11:43:22 UTC
[skywalking-banyandb] branch main updated: Introduce run module to
control the lifecycle of components
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 8b6a6af Introduce run module to control the lifecycle of components
8b6a6af is described below
commit 8b6a6afa108ec425211c8ef4b4f39bc81e634a6b
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Apr 14 19:39:57 2021 +0800
Introduce run module to control the lifecycle of components
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/executor/executor.go | 46 +++--
banyand/index/index.go | 32 ++-
banyand/internal/bus/bus.go | 23 +--
banyand/internal/cmd/standalone.go | 76 +++----
banyand/series/series.go | 32 ++-
banyand/shard/shard.go | 43 ++--
banyand/storage/pipeline.go | 61 ++++++
go.mod | 3 +-
go.sum | 2 +
pkg/logger/logger.go | 26 +--
pkg/logger/setting.go | 4 +-
pkg/logger/setting_test.go | 6 +-
pkg/run/run.go | 391 +++++++++++++++++++++++++++++++++++++
pkg/signal/handler.go | 70 +++++++
pkg/version/version.go | 38 ++++
15 files changed, 725 insertions(+), 128 deletions(-)
diff --git a/banyand/executor/executor.go b/banyand/executor/executor.go
index ac25e9e..bedc415 100644
--- a/banyand/executor/executor.go
+++ b/banyand/executor/executor.go
@@ -23,29 +23,47 @@ import (
"github.com/apache/skywalking-banyandb/banyand/internal/bus"
"github.com/apache/skywalking-banyandb/banyand/storage"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
-var _ bus.MessageListener = (*Executor)(nil)
+const name = "executor"
+
+var (
+ _ bus.MessageListener = (*Executor)(nil)
+ _ run.PreRunner = (*Executor)(nil)
+ _ storage.DataSubscriber = (*Executor)(nil)
+ _ storage.DataPublisher = (*Executor)(nil)
+)
type Executor struct {
- log *logger.Logger
- bus *bus.Bus
+ log *logger.Logger
+ publisher bus.Publisher
}
-func NewExecutor(bus *bus.Bus) *Executor {
- return &Executor{
- bus: bus,
- log: logger.GetLogger("executor"),
- }
+func (s *Executor) Pub(publisher bus.Publisher) error {
+ s.publisher = publisher
+ return nil
}
-func (s Executor) Rev(message bus.Message) {
- s.log.Info("rev", logger.Any("msg", message.Data()))
- _ = s.bus.Publish(storage.TraceIndex, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "index message"))
- _ = s.bus.Publish(storage.TraceData, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "data message"))
+func (s *Executor) ComponentName() string {
+ return name
+}
+
+func (s *Executor) Sub(subscriber bus.Subscriber) error {
+ return subscriber.Subscribe(storage.TraceRaw, s)
}
-func (s Executor) Close() error {
- s.log.Info("closed")
+func (s *Executor) Name() string {
+ return name
+}
+
+func (s *Executor) PreRun() error {
+ s.log = logger.GetLogger(name)
return nil
}
+
+func (s Executor) Rev(message bus.Message) {
+ s.log.Info("rev", logger.Any("msg", message.Data()))
+ _ = s.publisher.Publish(storage.TraceIndex, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "index message"))
+ _ = s.publisher.Publish(storage.TraceData, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "data message"))
+}
diff --git a/banyand/index/index.go b/banyand/index/index.go
index d29eb89..496e145 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -19,26 +19,40 @@ package index
import (
"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+ "github.com/apache/skywalking-banyandb/banyand/storage"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
-var _ bus.MessageListener = (*Index)(nil)
+const name = "index"
+
+var (
+ _ bus.MessageListener = (*Index)(nil)
+ _ run.PreRunner = (*Index)(nil)
+ _ storage.DataSubscriber = (*Index)(nil)
+)
type Index struct {
log *logger.Logger
}
-func NewIndex() *Index {
- return &Index{
- log: logger.GetLogger("Index"),
- }
+func (s *Index) ComponentName() string {
+ return name
}
-func (s Index) Rev(message bus.Message) {
- s.log.Info("rev", logger.Any("msg", message.Data()))
+func (s *Index) Sub(subscriber bus.Subscriber) error {
+ return subscriber.Subscribe(storage.TraceIndex, s)
+}
+
+func (s Index) Name() string {
+ return name
}
-func (s Index) Close() error {
- s.log.Info("closed")
+func (s Index) PreRun() error {
+ s.log = logger.GetLogger(name)
return nil
}
+
+func (s Index) Rev(message bus.Message) {
+ s.log.Info("rev", logger.Any("msg", message.Data()))
+}
diff --git a/banyand/internal/bus/bus.go b/banyand/internal/bus/bus.go
index e2699fa..05f2065 100644
--- a/banyand/internal/bus/bus.go
+++ b/banyand/internal/bus/bus.go
@@ -19,10 +19,7 @@ package bus
import (
"errors"
- "io"
"sync"
-
- "go.uber.org/atomic"
)
// Payload represents a simple data
@@ -43,10 +40,17 @@ func NewMessage(id MessageID, data interface{}) Message {
return Message{id: id, payload: data}
}
-// EventListener is the signature of functions that can handle an EventMessage.
+//MessageListener is the signature of functions that can handle an EventMessage.
type MessageListener interface {
Rev(message Message)
- io.Closer
+}
+
+type Subscriber interface {
+ Subscribe(topic Topic, listener MessageListener) error
+}
+
+type Publisher interface {
+ Publish(topic Topic, message ...Message) error
}
type Channel chan Message
@@ -56,7 +60,6 @@ type Topic string
// The Bus allows publish-subscribe-style communication between components
type Bus struct {
topics map[Topic][]Channel
- closed atomic.Bool
mutex sync.RWMutex
}
@@ -70,16 +73,12 @@ var (
ErrTopicEmpty = errors.New("the topic is empty")
ErrTopicNotExist = errors.New("the topic does not exist")
ErrListenerEmpty = errors.New("the message listener is empty")
- ErrClosed = errors.New("the bus is closed")
)
func (b *Bus) Publish(topic Topic, message ...Message) error {
if topic == "" {
return ErrTopicEmpty
}
- if b.closed.Load() {
- return ErrClosed
- }
cc, exit := b.topics[topic]
if !exit {
return ErrTopicNotExist
@@ -104,9 +103,6 @@ func (b *Bus) Subscribe(topic Topic, listener MessageListener) error {
if listener == nil {
return ErrListenerEmpty
}
- if b.closed.Load() {
- return ErrClosed
- }
b.mutex.Lock()
defer b.mutex.Unlock()
if _, exist := b.topics[topic]; !exist {
@@ -122,7 +118,6 @@ func (b *Bus) Subscribe(topic Topic, listener MessageListener) error {
if ok {
listener.Rev(c)
} else {
- _ = listener.Close()
break
}
}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 27ca159..8e0ea44 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -18,26 +18,26 @@
package cmd
import (
- "context"
"os"
- "os/signal"
- "syscall"
"github.com/spf13/cobra"
- "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/banyand/config"
- "github.com/apache/skywalking-banyandb/banyand/executor"
- "github.com/apache/skywalking-banyandb/banyand/index"
- "github.com/apache/skywalking-banyandb/banyand/internal/bus"
- "github.com/apache/skywalking-banyandb/banyand/series"
- "github.com/apache/skywalking-banyandb/banyand/shard"
+ executor2 "github.com/apache/skywalking-banyandb/banyand/executor"
+ index2 "github.com/apache/skywalking-banyandb/banyand/index"
+ series2 "github.com/apache/skywalking-banyandb/banyand/series"
+ shard2 "github.com/apache/skywalking-banyandb/banyand/shard"
"github.com/apache/skywalking-banyandb/banyand/storage"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/pkg/signal"
"github.com/apache/skywalking-banyandb/pkg/version"
)
-var standAloneConfig config.Standalone
+var (
+ standAloneConfig config.Standalone
+ g = run.Group{Name: "standalone"}
+)
func newStandaloneCmd() *cobra.Command {
standaloneCmd := &cobra.Command{
@@ -55,38 +55,38 @@ func newStandaloneCmd() *cobra.Command {
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
logger.GetLogger().Info("starting as a standalone server")
- dataBus := bus.NewBus()
- err = multierr.Append(err, dataBus.Subscribe(storage.TraceRaw, shard.NewShard(dataBus)))
- err = multierr.Append(err, dataBus.Subscribe(storage.TraceSharded, executor.NewExecutor(dataBus)))
- err = multierr.Append(err, dataBus.Subscribe(storage.TraceIndex, index.NewIndex()))
- err = multierr.Append(err, dataBus.Subscribe(storage.TraceData, series.NewSeries()))
- if err != nil {
- return err
- }
- if err = dataBus.Publish(storage.TraceRaw, bus.NewMessage(0, "initialization")); err != nil {
- return err
+ engine := new(storage.Pipeline)
+ shard := new(shard2.Shard)
+ executor := new(executor2.Executor)
+ index := new(index2.Index)
+ series := new(series2.Series)
+
+ // Register the storage engine components.
+ engine.Register(
+ shard,
+ executor,
+ index,
+ series,
+ )
+
+ // Register the run Group units.
+ g.Register(
+ new(signal.Handler),
+ engine,
+ shard,
+ executor,
+ index,
+ series,
+ )
+
+ // Spawn our go routines and wait for shutdown.
+ if err := g.Run(args...); err != nil {
+ logger.GetLogger().Error("exit: ", logger.String("name", g.Name), logger.Error(err))
+ os.Exit(-1)
}
- ctx := newContext()
- <-ctx.Done()
return nil
},
}
return standaloneCmd
}
-
-func newContext() context.Context {
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, syscall.SIGTERM)
- ctx, cancel := context.WithCancel(context.Background())
- go func() {
- defer cancel()
- select {
- case <-ctx.Done():
- return
- case <-c:
- return
- }
- }()
- return ctx
-}
diff --git a/banyand/series/series.go b/banyand/series/series.go
index a51a2a1..850e9b1 100644
--- a/banyand/series/series.go
+++ b/banyand/series/series.go
@@ -19,26 +19,40 @@ package series
import (
"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+ "github.com/apache/skywalking-banyandb/banyand/storage"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
-var _ bus.MessageListener = (*Series)(nil)
+const name = "series"
+
+var (
+ _ bus.MessageListener = (*Series)(nil)
+ _ run.PreRunner = (*Series)(nil)
+ _ storage.DataSubscriber = (*Series)(nil)
+)
type Series struct {
log *logger.Logger
}
-func NewSeries() *Series {
- return &Series{
- log: logger.GetLogger("series"),
- }
+func (s Series) ComponentName() string {
+ return name
}
-func (s Series) Rev(message bus.Message) {
- s.log.Info("rev", logger.Any("msg", message.Data()))
+func (s *Series) Sub(subscriber bus.Subscriber) error {
+ return subscriber.Subscribe(storage.TraceData, s)
+}
+
+func (s Series) Name() string {
+ return name
}
-func (s Series) Close() error {
- s.log.Info("closed")
+func (s *Series) PreRun() error {
+ s.log = logger.GetLogger(name)
return nil
}
+
+func (s Series) Rev(message bus.Message) {
+ s.log.Info("rev", logger.Any("msg", message.Data()))
+}
diff --git a/banyand/shard/shard.go b/banyand/shard/shard.go
index a25baff..24c191c 100644
--- a/banyand/shard/shard.go
+++ b/banyand/shard/shard.go
@@ -23,28 +23,45 @@ import (
"github.com/apache/skywalking-banyandb/banyand/internal/bus"
"github.com/apache/skywalking-banyandb/banyand/storage"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
-var _ bus.MessageListener = (*Shard)(nil)
+var (
+ _ bus.MessageListener = (*Shard)(nil)
+ _ run.PreRunner = (*Shard)(nil)
+ _ storage.DataSubscriber = (*Shard)(nil)
+ _ storage.DataPublisher = (*Shard)(nil)
+)
type Shard struct {
- log *logger.Logger
- bus *bus.Bus
+ log *logger.Logger
+ publisher bus.Publisher
}
-func NewShard(bus *bus.Bus) *Shard {
- return &Shard{
- bus: bus,
- log: logger.GetLogger("shard"),
- }
+func (s Shard) ComponentName() string {
+ return "shard"
}
-func (s Shard) Rev(message bus.Message) {
- s.log.Info("rev", logger.Any("msg", message.Data()))
- _ = s.bus.Publish(storage.TraceSharded, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "sharded message"))
+func (s *Shard) Pub(publisher bus.Publisher) error {
+ s.publisher = publisher
+ return nil
+}
+
+func (s *Shard) Sub(subscriber bus.Subscriber) error {
+ return subscriber.Subscribe(storage.TraceRaw, s)
}
-func (s Shard) Close() error {
- s.log.Info("closed")
+func (s *Shard) PreRun() error {
+ s.log = logger.GetLogger("shard")
+ s.log.Info("pre running")
return nil
}
+
+func (s *Shard) Name() string {
+ return "shard"
+}
+
+func (s Shard) Rev(message bus.Message) {
+ s.log.Info("rev", logger.Any("msg", message.Data()))
+ _ = s.publisher.Publish(storage.TraceSharded, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "sharded message"))
+}
diff --git a/banyand/storage/pipeline.go b/banyand/storage/pipeline.go
index eb30eab..6f51c84 100644
--- a/banyand/storage/pipeline.go
+++ b/banyand/storage/pipeline.go
@@ -17,9 +17,70 @@
package storage
+import (
+ "go.uber.org/multierr"
+
+ "github.com/apache/skywalking-banyandb/banyand/internal/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
const (
TraceRaw = "trace-raw"
TraceSharded = "trace-sharded"
TraceIndex = "trace-index"
TraceData = "trace-data"
)
+
+const name = "storage-engine"
+
+type Component interface {
+ ComponentName() string
+}
+
+type DataSubscriber interface {
+ Component
+ Sub(subscriber bus.Subscriber) error
+}
+
+type DataPublisher interface {
+ Component
+ Pub(publisher bus.Publisher) error
+}
+
+var _ run.PreRunner = (*Pipeline)(nil)
+
+type Pipeline struct {
+ logger *logger.Logger
+ dataBus *bus.Bus
+ dps []DataPublisher
+ dss []DataSubscriber
+}
+
+func (e Pipeline) Name() string {
+ return name
+}
+
+func (e *Pipeline) PreRun() error {
+ e.logger = logger.GetLogger(name)
+ var err error
+ e.dataBus = bus.NewBus()
+ for _, dp := range e.dps {
+ err = multierr.Append(err, dp.Pub(e.dataBus))
+ }
+ for _, ds := range e.dss {
+ err = multierr.Append(err, ds.Sub(e.dataBus))
+ }
+ return err
+}
+
+func (e *Pipeline) Register(component ...Component) {
+ for _, c := range component {
+ if ds, ok := c.(DataSubscriber); ok {
+ e.dss = append(e.dss, ds)
+ }
+ if ps, ok := c.(DataPublisher); ok {
+ e.dps = append(e.dps, ps)
+ }
+ }
+}
diff --git a/go.mod b/go.mod
index fd684ee..0927f71 100644
--- a/go.mod
+++ b/go.mod
@@ -3,10 +3,11 @@ module github.com/apache/skywalking-banyandb
go 1.16
require (
+ github.com/oklog/run v1.1.0
github.com/spf13/cobra v1.1.3
+ github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.4.0
- go.uber.org/atomic v1.7.0
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.16.0
)
diff --git a/go.sum b/go.sum
index 5d102e4..e426a98 100644
--- a/go.sum
+++ b/go.sum
@@ -128,6 +128,8 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
+github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 1429a5a..8616612 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -25,31 +25,7 @@ import (
// Logger is wrapper for zap logger with module, it is singleton.
type Logger struct {
module string
- logger *zap.Logger
-}
-
-// Debug logs a message at DebugLevel. The message includes any fields passed
-// at the log site, as well as any fields accumulated on the logger.
-func (l *Logger) Debug(msg string, fields ...zap.Field) {
- l.logger.Debug(msg, fields...)
-}
-
-// Info logs a message at InfoLevel. The message includes any fields passed
-// at the log site, as well as any fields accumulated on the logger.
-func (l *Logger) Info(msg string, fields ...zap.Field) {
- l.logger.Info(msg, fields...)
-}
-
-// Warn logs a message at WarnLevel. The message includes any fields passed
-// at the log site, as well as any fields accumulated on the logger.
-func (l *Logger) Warn(msg string, fields ...zap.Field) {
- l.logger.Warn(msg, fields...)
-}
-
-// Error logs a message at ErrorLevel. The message includes any fields passed
-// at the log site, as well as any fields accumulated on the logger.
-func (l *Logger) Error(msg string, fields ...zap.Field) {
- l.logger.Error(msg, fields...)
+ *zap.Logger
}
// String constructs a field with the given key and value.
diff --git a/pkg/logger/setting.go b/pkg/logger/setting.go
index 5fb7dbd..bae8827 100644
--- a/pkg/logger/setting.go
+++ b/pkg/logger/setting.go
@@ -38,7 +38,7 @@ func GetLogger(scope ...string) *Logger {
return root
}
module := strings.Join(scope, ".")
- return &Logger{module: module, logger: root.logger.Named(module)}
+ return &Logger{module: module, Logger: root.Logger.Named(module)}
}
// InitLogger initializes a zap logger from user config
@@ -73,5 +73,5 @@ func getLogger(cfg config.Logging) (*Logger, error) {
if err != nil {
return nil, err
}
- return &Logger{module: "root", logger: l}, nil
+ return &Logger{module: "root", Logger: l}, nil
}
diff --git a/pkg/logger/setting_test.go b/pkg/logger/setting_test.go
index d065af2..784ef4a 100644
--- a/pkg/logger/setting_test.go
+++ b/pkg/logger/setting_test.go
@@ -81,10 +81,10 @@ func TestInitLogger(t *testing.T) {
}
if err == nil {
assert.NotNil(t, logger)
- assert.NotNil(t, logger.logger)
+ assert.NotNil(t, logger.Logger)
assert.NotEmpty(t, logger.module)
- assert.Equal(t, tt.want.isDev, reflect.ValueOf(*logger.logger).FieldByName("development").Bool())
- assert.NotNil(t, logger.logger.Check(tt.want.level, "foo"))
+ assert.Equal(t, tt.want.isDev, reflect.ValueOf(*logger.Logger).FieldByName("development").Bool())
+ assert.NotNil(t, logger.Logger.Check(tt.want.level, "foo"))
}
})
}
diff --git a/pkg/run/run.go b/pkg/run/run.go
new file mode 100644
index 0000000..ccee525
--- /dev/null
+++ b/pkg/run/run.go
@@ -0,0 +1,391 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package run
+
+import (
+ "fmt"
+ "os"
+ "path"
+
+ "github.com/oklog/run"
+ "github.com/spf13/pflag"
+ "go.uber.org/multierr"
+
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// FlagSet holds a pflag.FlagSet as well as an exported Name variable for
+// allowing improved help usage information.
+type FlagSet struct {
+ *pflag.FlagSet
+ Name string
+}
+
+// NewFlagSet returns a new FlagSet for usage in Config objects.
+func NewFlagSet(name string) *FlagSet {
+ return &FlagSet{
+ FlagSet: pflag.NewFlagSet(name, pflag.ContinueOnError),
+ Name: name,
+ }
+}
+
+// Unit is the default interface an object needs to implement for it to be able
+// to register with a Group.
+// Name should return a short but good identifier of the Unit.
+type Unit interface {
+ Name() string
+}
+
+// Config interface should be implemented by Group Unit objects that manage
+// their own configuration through the use of flags.
+// If a Unit's Validate returns an error it will stop the Group immediately.
+type Config interface {
+ //Unit for Group registration and identification
+ Unit
+ //FlagSet returns an object's FlagSet
+ FlagSet() *FlagSet
+ //Validate checks an object's stored values
+ Validate() error
+}
+
+// PreRunner interface should be implemented by Group Unit objects that need
+// a pre run stage before starting the Group Services.
+// If a Unit's PreRun returns an error it will stop the Group immediately.
+type PreRunner interface {
+ //Unit for Group registration and identification
+ Unit
+ PreRun() error
+}
+
+// NewPreRunner takes a name and a standalone pre runner compatible function
+// and turns them into a Group compatible PreRunner, ready for registration.
+func NewPreRunner(name string, fn func() error) PreRunner {
+ return preRunner{name: name, fn: fn}
+}
+
+type preRunner struct {
+ name string
+ fn func() error
+}
+
+func (p preRunner) Name() string {
+ return p.name
+}
+
+func (p preRunner) PreRun() error {
+ return p.fn()
+}
+
+// Service interface should be implemented by Group Unit objects that need
+// to run a blocking service until an error occurs or a shutdown request is
+// made.
+// The Serve method must be blocking and return an error on unexpected shutdown.
+// Recoverable errors need to be handled inside the service itself.
+// GracefulStop must gracefully stop the service and make the Serve call return.
+//
+// Since Service is managed by Group, it is considered a design flaw to call any
+// of the Service methods directly in application code.
+type Service interface {
+ // Unit for Group registration and identification
+ Unit
+ // Serve starts the GroupService and blocks.
+ Serve() error
+ // GracefulStop shuts down and cleans up the GroupService.
+ GracefulStop()
+}
+
+// Group builds on https://github.com/oklog/run to provide a deterministic way
+// to manage service lifecycles. It allows for easy composition of elegant
+// monoliths as well as adding signal handlers, metrics services, etc.
+type Group struct {
+ // Name of the Group managed service. If omitted, the binaryname will be
+ // used as found at runtime.
+ Name string
+
+ f *FlagSet
+ r run.Group
+ c []Config
+ p []PreRunner
+ s []Service
+ log *logger.Logger
+
+ configured bool
+ hsRegistered bool
+}
+
+// Register will inspect the provided objects implementing the Unit interface to
+// see if it needs to register the objects for any of the Group bootstrap
+// phases. If a Unit doesn't satisfy any of the bootstrap phases it is ignored
+// by Group.
+// The returned array of booleans is of the same size as the amount of provided
+// Units, signalling for each provided Unit if it successfully registered with
+// Group for at least one of the bootstrap phases or if it was ignored.
+func (g *Group) Register(units ...Unit) []bool {
+ hasRegistered := make([]bool, len(units))
+ for idx := range units {
+ if !g.configured {
+ // if RunConfig has been called we can no longer register Config
+ // phases of Units
+ if c, ok := units[idx].(Config); ok {
+ g.c = append(g.c, c)
+ hasRegistered[idx] = true
+ }
+ }
+ if p, ok := units[idx].(PreRunner); ok {
+ g.p = append(g.p, p)
+ hasRegistered[idx] = true
+ }
+ if s, ok := units[idx].(Service); ok {
+ g.s = append(g.s, s)
+ hasRegistered[idx] = true
+ }
+ }
+ return hasRegistered
+}
+
+// RunConfig runs the Config phase of all registered Config aware Units.
+// Only use this function if needing to add additional wiring between config
+// and (pre)run phases and a separate PreRunner phase is not an option.
+// In most cases it is best to use the Run method directly as it will run the
+// Config phase prior to executing the PreRunner and Service phases.
+// If an error is returned the application must shut down as it is considered
+// fatal.
+func (g *Group) RunConfig(args ...string) (err error) {
+
+ g.configured = true
+
+ if g.Name == "" {
+ // use the binary name if custom name has not been provided
+ g.Name = path.Base(os.Args[0])
+ }
+
+ g.log = logger.GetLogger(g.Name)
+
+ defer func() {
+ if err != nil {
+ g.log.Error("unexpected exit", logger.Error(err))
+ }
+ }()
+
+ // run configuration stage
+ g.f = NewFlagSet(g.Name)
+ g.f.SortFlags = false // keep order of flag registration
+ g.f.Usage = func() {
+ fmt.Printf("Flags:\n")
+ g.f.PrintDefaults()
+ }
+
+ // register default rungroup flags
+ var (
+ name string
+ showVersion bool
+ showRunGroup bool
+ )
+
+ gFS := NewFlagSet("Common Service options")
+ gFS.SortFlags = false
+ gFS.StringVarP(&name, "name", "n", g.Name, `name of this service`)
+ gFS.BoolVarP(&showVersion, "version", "v", false,
+ "show version information and exit.")
+ gFS.BoolVar(&showRunGroup, "show-rungroup-units", false, "show rungroup units")
+ g.f.AddFlagSet(gFS.FlagSet)
+
+ // default to os.Args if args parameter was omitted
+ if len(args) == 0 {
+ args = os.Args[1:]
+ }
+
+ // parse our rungroup flags only (not the plugin ones)
+ _ = gFS.Parse(args)
+ if name != "" {
+ g.Name = name
+ }
+
+ // register flags from attached Config objects
+ fs := make([]*FlagSet, len(g.c))
+ for idx := range g.c {
+ // a Namer might have been deregistered
+ if g.c[idx] == nil {
+ continue
+ }
+ nameField := logger.String("name", g.c[idx].Name())
+ indexField := logger.Uint32("index", uint32(idx))
+ g.log.Debug("register flags", nameField, indexField,
+ logger.Uint32("total", uint32(len(g.c))))
+ fs[idx] = g.c[idx].FlagSet()
+ if fs[idx] == nil {
+ // no FlagSet returned
+ g.log.Debug("config object did not return a flagset", nameField)
+ continue
+ }
+ fs[idx].VisitAll(func(f *pflag.Flag) {
+ if g.f.Lookup(f.Name) != nil {
+ // log duplicate flag
+ g.log.Warn("ignoring duplicate flag", logger.String("name", f.Name), indexField)
+ return
+ }
+ g.f.AddFlag(f)
+ })
+ }
+
+ // parse FlagSet and exit on error
+ if err = g.f.Parse(args); err != nil {
+ return err
+ }
+
+ // bail early on help or version requests
+ switch {
+ case showRunGroup:
+ fmt.Println(g.ListUnits())
+ return nil
+ }
+
+ // Validate Config inputs
+ for idx := range g.c {
+ // a Config might have been deregistered during Run
+ indexField := logger.Uint32("index", uint32(idx))
+ if g.c[idx] == nil {
+ g.log.Debug("skipping validate", indexField)
+ continue
+ }
+ g.log.Debug("validate config: %s (%d/%d)", logger.String("name", g.c[idx].Name()), indexField,
+ logger.Uint32("total", uint32(len(g.c))))
+ if vErr := g.c[idx].Validate(); vErr != nil {
+ err = multierr.Append(err, vErr)
+ }
+ }
+
+ // exit on at least one Validate error
+ if err != nil {
+ return err
+ }
+
+ // log binary name and version
+ g.log.Info("started")
+
+ return nil
+}
+
+// Run will execute all phases of all registered Units and block until an error
+// occurs.
+// If RunConfig has been called prior to Run, the Group's Config phase will be
+// skipped and Run continues with the PreRunner and Service phases.
+//
+// The following phases are executed in the following sequence:
+//
+// Config phase (serially, in order of Unit registration)
+// - FlagSet() Get & register all FlagSets from Config Units.
+// - Flag Parsing Using the provided args (os.Args if empty)
+// - Validate() Validate Config Units. Exit on first error.
+//
+// PreRunner phase (serially, in order of Unit registration)
+// - PreRun() Execute PreRunner Units. Exit on first error.
+//
+// Service phase (concurrently)
+// - Serve() Execute all Service Units in separate Go routines.
+// - Wait Block until one of the Serve() methods returns
+// - GracefulStop() Call interrupt handlers of all Service Units.
+//
+// Run will return with the originating error on:
+// - first Config.Validate() returning an error
+// - first PreRunner.PreRun() returning an error
+// - first Service.Serve() returning (error or nil)
+//
+func (g *Group) Run(args ...string) (err error) {
+ // run config registration and flag parsing stages
+ if err = g.RunConfig(args...); err != nil {
+ return err
+ }
+ defer func() {
+ if err != nil {
+ g.log.Error("unexpected exit", logger.Error(err))
+ }
+ }()
+
+ // execute pre run stage and exit on error
+ for idx := range g.p {
+ // a PreRunner might have been deregistered during Run
+ if g.p[idx] == nil {
+ continue
+ }
+ g.log.Debug("pre-run:", logger.String("name", g.p[idx].Name()),
+ logger.Uint32("ran", uint32(idx+1)), logger.Uint32("total", uint32(len(g.p))))
+ if err := g.p[idx].PreRun(); err != nil {
+ return err
+ }
+ }
+
+ // feed our registered services to our internal run.Group
+ for idx := range g.s {
+ // a Service might have been deregistered during Run
+ s := g.s[idx]
+ if s == nil {
+ continue
+ }
+
+ g.log.Debug("serve:", logger.String("name", s.Name()),
+ logger.Uint32("ran", uint32(idx+1)), logger.Uint32("total", uint32(len(g.s))))
+ g.r.Add(func() error {
+ return s.Serve()
+ }, func(_ error) {
+ g.log.Debug("stop:", logger.String("name", s.Name()),
+ logger.Uint32("ran", uint32(idx+1)), logger.Uint32("total", uint32(len(g.s))))
+ s.GracefulStop()
+ })
+ }
+
+ // start registered services and block
+ return g.r.Run()
+}
+
+// ListUnits returns a list of all Group phases and the Units registered to each
+// of them.
+func (g Group) ListUnits() string {
+ var (
+ s string
+ t = "cli"
+ )
+
+ if len(g.c) > 0 {
+ s += "\n- config: "
+ for _, u := range g.c {
+ if u != nil {
+ s += u.Name() + " "
+ }
+ }
+ }
+ if len(g.p) > 0 {
+ s += "\n- prerun: "
+ for _, u := range g.p {
+ if u != nil {
+ s += u.Name() + " "
+ }
+ }
+ }
+ if len(g.s) > 0 {
+ s += "\n- serve : "
+ for _, u := range g.s {
+ if u != nil {
+ t = "svc"
+ s += u.Name() + " "
+ }
+ }
+ }
+
+ return fmt.Sprintf("Group: %s [%s]%s", g.Name, t, s)
+}
diff --git a/pkg/signal/handler.go b/pkg/signal/handler.go
new file mode 100644
index 0000000..df3d7a3
--- /dev/null
+++ b/pkg/signal/handler.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package signal
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "os/signal"
+ "syscall"
+
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// ErrSignal is returned when a termination signal is received.
+var ErrSignal = errors.New("signal received")
+var _ run.Service = (*Handler)(nil)
+
+// Handler implements a unix signal handler as run.GroupService.
+type Handler struct {
+ signal chan os.Signal
+ cancel chan struct{}
+}
+
+func (h *Handler) Name() string {
+ return "signal"
+}
+
+// PreRun implements run.PreRunner to initialize the handler.
+func (h *Handler) PreRun() error {
+ h.cancel = make(chan struct{})
+ h.signal = make(chan os.Signal, 1)
+ signal.Notify(h.signal,
+ syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
+ return nil
+}
+
+// Serve implements run.Service and listens for incoming unix signals.
+func (h *Handler) Serve() error {
+ for {
+ select {
+ case sig := <-h.signal:
+ return fmt.Errorf("%s %w", sig, ErrSignal)
+ case <-h.cancel:
+ signal.Stop(h.signal)
+ close(h.signal)
+ return nil
+ }
+ }
+}
+
+// GracefulStop implements run.GroupService and will close the signal handler.
+func (h *Handler) GracefulStop() {
+ close(h.cancel)
+}
diff --git a/pkg/version/version.go b/pkg/version/version.go
index 8ffaaf0..10dc5cb 100644
--- a/pkg/version/version.go
+++ b/pkg/version/version.go
@@ -19,6 +19,11 @@
// git branches and tags into the binary importing this package.
package version
+import (
+ "fmt"
+ "strings"
+)
+
// build is to be populated at build time using -ldflags -X.
var build string
@@ -26,3 +31,36 @@ var build string
func Build() string {
return build
}
+
+// Show the service's version information
+func Show(serviceName string) {
+ fmt.Println(serviceName + " " + Parse())
+}
+
+// Parse returns the parsed service's version information. (from raw git label)
+func Parse() string {
+ // versionString syntax:
+ // <release tag>-<commits since release tag>-g<commit hash>-<branch name>
+ v := strings.SplitN(build, "-", 4)
+ // prefix v on semantic versioning tags omitting it
+ // Go module tags should include the 'v'
+ if len(v[0]) > 1 && strings.ToLower(v[0])[0] != 'v' {
+ v[0] = "v" + v[0]
+ }
+ switch {
+ case len(v) != 4:
+ // built without using the make tooling
+ return "v0.0.0-unofficial"
+ case v[1] != "0":
+ // built from a non release commit point
+ // In the version string, the commit tag is prefixed with "-g" (which stands for "git").
+ // When printing the version string, remove that prefix to just show the real commit hash.
+ return fmt.Sprintf("%s-%s (%s, +%s)", v[0], v[3], v[2][1:], v[1])
+ case v[3] != "master":
+ // specific branch release build
+ return fmt.Sprintf("%s-%s", v[0], v[3])
+ default:
+ // master release build
+ return v[0]
+ }
+}