You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/04/14 11:43:22 UTC

[skywalking-banyandb] branch main updated: Introduce run module to control the lifecycle of components

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b6a6af  Introduce run module to control the lifecycle of components
8b6a6af is described below

commit 8b6a6afa108ec425211c8ef4b4f39bc81e634a6b
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Apr 14 19:39:57 2021 +0800

    Introduce run module to control the lifecycle of components
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/executor/executor.go       |  46 +++--
 banyand/index/index.go             |  32 ++-
 banyand/internal/bus/bus.go        |  23 +--
 banyand/internal/cmd/standalone.go |  76 +++----
 banyand/series/series.go           |  32 ++-
 banyand/shard/shard.go             |  43 ++--
 banyand/storage/pipeline.go        |  61 ++++++
 go.mod                             |   3 +-
 go.sum                             |   2 +
 pkg/logger/logger.go               |  26 +--
 pkg/logger/setting.go              |   4 +-
 pkg/logger/setting_test.go         |   6 +-
 pkg/run/run.go                     | 391 +++++++++++++++++++++++++++++++++++++
 pkg/signal/handler.go              |  70 +++++++
 pkg/version/version.go             |  38 ++++
 15 files changed, 725 insertions(+), 128 deletions(-)

diff --git a/banyand/executor/executor.go b/banyand/executor/executor.go
index ac25e9e..bedc415 100644
--- a/banyand/executor/executor.go
+++ b/banyand/executor/executor.go
@@ -23,29 +23,47 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-var _ bus.MessageListener = (*Executor)(nil)
+const name = "executor"
+
+var (
+	_ bus.MessageListener    = (*Executor)(nil)
+	_ run.PreRunner          = (*Executor)(nil)
+	_ storage.DataSubscriber = (*Executor)(nil)
+	_ storage.DataPublisher  = (*Executor)(nil)
+)
 
 type Executor struct {
-	log *logger.Logger
-	bus *bus.Bus
+	log       *logger.Logger
+	publisher bus.Publisher
 }
 
-func NewExecutor(bus *bus.Bus) *Executor {
-	return &Executor{
-		bus: bus,
-		log: logger.GetLogger("executor"),
-	}
+func (s *Executor) Pub(publisher bus.Publisher) error {
+	s.publisher = publisher
+	return nil
 }
 
-func (s Executor) Rev(message bus.Message) {
-	s.log.Info("rev", logger.Any("msg", message.Data()))
-	_ = s.bus.Publish(storage.TraceIndex, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "index message"))
-	_ = s.bus.Publish(storage.TraceData, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "data message"))
+func (s *Executor) ComponentName() string {
+	return name
+}
+
+func (s *Executor) Sub(subscriber bus.Subscriber) error {
+	return subscriber.Subscribe(storage.TraceRaw, s)
 }
 
-func (s Executor) Close() error {
-	s.log.Info("closed")
+func (s *Executor) Name() string {
+	return name
+}
+
+func (s *Executor) PreRun() error {
+	s.log = logger.GetLogger(name)
 	return nil
 }
+
+func (s Executor) Rev(message bus.Message) {
+	s.log.Info("rev", logger.Any("msg", message.Data()))
+	_ = s.publisher.Publish(storage.TraceIndex, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "index message"))
+	_ = s.publisher.Publish(storage.TraceData, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "data message"))
+}
diff --git a/banyand/index/index.go b/banyand/index/index.go
index d29eb89..496e145 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -19,26 +19,40 @@ package index
 
 import (
 	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	"github.com/apache/skywalking-banyandb/banyand/storage"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-var _ bus.MessageListener = (*Index)(nil)
+const name = "index"
+
+var (
+	_ bus.MessageListener    = (*Index)(nil)
+	_ run.PreRunner          = (*Index)(nil)
+	_ storage.DataSubscriber = (*Index)(nil)
+)
 
 type Index struct {
 	log *logger.Logger
 }
 
-func NewIndex() *Index {
-	return &Index{
-		log: logger.GetLogger("Index"),
-	}
+func (s *Index) ComponentName() string {
+	return name
 }
 
-func (s Index) Rev(message bus.Message) {
-	s.log.Info("rev", logger.Any("msg", message.Data()))
+func (s *Index) Sub(subscriber bus.Subscriber) error {
+	return subscriber.Subscribe(storage.TraceIndex, s)
+}
+
+func (s Index) Name() string {
+	return name
 }
 
-func (s Index) Close() error {
-	s.log.Info("closed")
+func (s Index) PreRun() error {
+	s.log = logger.GetLogger(name)
 	return nil
 }
+
+func (s Index) Rev(message bus.Message) {
+	s.log.Info("rev", logger.Any("msg", message.Data()))
+}
diff --git a/banyand/internal/bus/bus.go b/banyand/internal/bus/bus.go
index e2699fa..05f2065 100644
--- a/banyand/internal/bus/bus.go
+++ b/banyand/internal/bus/bus.go
@@ -19,10 +19,7 @@ package bus
 
 import (
 	"errors"
-	"io"
 	"sync"
-
-	"go.uber.org/atomic"
 )
 
 // Payload represents a simple data
@@ -43,10 +40,17 @@ func NewMessage(id MessageID, data interface{}) Message {
 	return Message{id: id, payload: data}
 }
 
-// EventListener is the signature of functions that can handle an EventMessage.
+//MessageListener is the signature of functions that can handle an EventMessage.
 type MessageListener interface {
 	Rev(message Message)
-	io.Closer
+}
+
+type Subscriber interface {
+	Subscribe(topic Topic, listener MessageListener) error
+}
+
+type Publisher interface {
+	Publish(topic Topic, message ...Message) error
 }
 
 type Channel chan Message
@@ -56,7 +60,6 @@ type Topic string
 // The Bus allows publish-subscribe-style communication between components
 type Bus struct {
 	topics map[Topic][]Channel
-	closed atomic.Bool
 	mutex  sync.RWMutex
 }
 
@@ -70,16 +73,12 @@ var (
 	ErrTopicEmpty    = errors.New("the topic is empty")
 	ErrTopicNotExist = errors.New("the topic does not exist")
 	ErrListenerEmpty = errors.New("the message listener is empty")
-	ErrClosed        = errors.New("the bus is closed")
 )
 
 func (b *Bus) Publish(topic Topic, message ...Message) error {
 	if topic == "" {
 		return ErrTopicEmpty
 	}
-	if b.closed.Load() {
-		return ErrClosed
-	}
 	cc, exit := b.topics[topic]
 	if !exit {
 		return ErrTopicNotExist
@@ -104,9 +103,6 @@ func (b *Bus) Subscribe(topic Topic, listener MessageListener) error {
 	if listener == nil {
 		return ErrListenerEmpty
 	}
-	if b.closed.Load() {
-		return ErrClosed
-	}
 	b.mutex.Lock()
 	defer b.mutex.Unlock()
 	if _, exist := b.topics[topic]; !exist {
@@ -122,7 +118,6 @@ func (b *Bus) Subscribe(topic Topic, listener MessageListener) error {
 			if ok {
 				listener.Rev(c)
 			} else {
-				_ = listener.Close()
 				break
 			}
 		}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 27ca159..8e0ea44 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -18,26 +18,26 @@
 package cmd
 
 import (
-	"context"
 	"os"
-	"os/signal"
-	"syscall"
 
 	"github.com/spf13/cobra"
-	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/banyand/config"
-	"github.com/apache/skywalking-banyandb/banyand/executor"
-	"github.com/apache/skywalking-banyandb/banyand/index"
-	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
-	"github.com/apache/skywalking-banyandb/banyand/series"
-	"github.com/apache/skywalking-banyandb/banyand/shard"
+	executor2 "github.com/apache/skywalking-banyandb/banyand/executor"
+	index2 "github.com/apache/skywalking-banyandb/banyand/index"
+	series2 "github.com/apache/skywalking-banyandb/banyand/series"
+	shard2 "github.com/apache/skywalking-banyandb/banyand/shard"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
+	"github.com/apache/skywalking-banyandb/pkg/signal"
 	"github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-var standAloneConfig config.Standalone
+var (
+	standAloneConfig config.Standalone
+	g                = run.Group{Name: "standalone"}
+)
 
 func newStandaloneCmd() *cobra.Command {
 	standaloneCmd := &cobra.Command{
@@ -55,38 +55,38 @@ func newStandaloneCmd() *cobra.Command {
 		},
 		RunE: func(cmd *cobra.Command, args []string) (err error) {
 			logger.GetLogger().Info("starting as a standalone server")
-			dataBus := bus.NewBus()
-			err = multierr.Append(err, dataBus.Subscribe(storage.TraceRaw, shard.NewShard(dataBus)))
-			err = multierr.Append(err, dataBus.Subscribe(storage.TraceSharded, executor.NewExecutor(dataBus)))
-			err = multierr.Append(err, dataBus.Subscribe(storage.TraceIndex, index.NewIndex()))
-			err = multierr.Append(err, dataBus.Subscribe(storage.TraceData, series.NewSeries()))
-			if err != nil {
-				return err
-			}
-			if err = dataBus.Publish(storage.TraceRaw, bus.NewMessage(0, "initialization")); err != nil {
-				return err
+			engine := new(storage.Pipeline)
+			shard := new(shard2.Shard)
+			executor := new(executor2.Executor)
+			index := new(index2.Index)
+			series := new(series2.Series)
+
+			// Register the storage engine components.
+			engine.Register(
+				shard,
+				executor,
+				index,
+				series,
+			)
+
+			// Register the run Group units.
+			g.Register(
+				new(signal.Handler),
+				engine,
+				shard,
+				executor,
+				index,
+				series,
+			)
+
+			// Spawn our go routines and wait for shutdown.
+			if err := g.Run(args...); err != nil {
+				logger.GetLogger().Error("exit: ", logger.String("name", g.Name), logger.Error(err))
+				os.Exit(-1)
 			}
-			ctx := newContext()
-			<-ctx.Done()
 			return nil
 		},
 	}
 
 	return standaloneCmd
 }
-
-func newContext() context.Context {
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
-	ctx, cancel := context.WithCancel(context.Background())
-	go func() {
-		defer cancel()
-		select {
-		case <-ctx.Done():
-			return
-		case <-c:
-			return
-		}
-	}()
-	return ctx
-}
diff --git a/banyand/series/series.go b/banyand/series/series.go
index a51a2a1..850e9b1 100644
--- a/banyand/series/series.go
+++ b/banyand/series/series.go
@@ -19,26 +19,40 @@ package series
 
 import (
 	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	"github.com/apache/skywalking-banyandb/banyand/storage"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-var _ bus.MessageListener = (*Series)(nil)
+const name = "series"
+
+var (
+	_ bus.MessageListener    = (*Series)(nil)
+	_ run.PreRunner          = (*Series)(nil)
+	_ storage.DataSubscriber = (*Series)(nil)
+)
 
 type Series struct {
 	log *logger.Logger
 }
 
-func NewSeries() *Series {
-	return &Series{
-		log: logger.GetLogger("series"),
-	}
+func (s Series) ComponentName() string {
+	return name
 }
 
-func (s Series) Rev(message bus.Message) {
-	s.log.Info("rev", logger.Any("msg", message.Data()))
+func (s *Series) Sub(subscriber bus.Subscriber) error {
+	return subscriber.Subscribe(storage.TraceData, s)
+}
+
+func (s Series) Name() string {
+	return name
 }
 
-func (s Series) Close() error {
-	s.log.Info("closed")
+func (s *Series) PreRun() error {
+	s.log = logger.GetLogger(name)
 	return nil
 }
+
+func (s Series) Rev(message bus.Message) {
+	s.log.Info("rev", logger.Any("msg", message.Data()))
+}
diff --git a/banyand/shard/shard.go b/banyand/shard/shard.go
index a25baff..24c191c 100644
--- a/banyand/shard/shard.go
+++ b/banyand/shard/shard.go
@@ -23,28 +23,45 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-var _ bus.MessageListener = (*Shard)(nil)
+var (
+	_ bus.MessageListener    = (*Shard)(nil)
+	_ run.PreRunner          = (*Shard)(nil)
+	_ storage.DataSubscriber = (*Shard)(nil)
+	_ storage.DataPublisher  = (*Shard)(nil)
+)
 
 type Shard struct {
-	log *logger.Logger
-	bus *bus.Bus
+	log       *logger.Logger
+	publisher bus.Publisher
 }
 
-func NewShard(bus *bus.Bus) *Shard {
-	return &Shard{
-		bus: bus,
-		log: logger.GetLogger("shard"),
-	}
+func (s Shard) ComponentName() string {
+	return "shard"
 }
 
-func (s Shard) Rev(message bus.Message) {
-	s.log.Info("rev", logger.Any("msg", message.Data()))
-	_ = s.bus.Publish(storage.TraceSharded, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "sharded message"))
+func (s *Shard) Pub(publisher bus.Publisher) error {
+	s.publisher = publisher
+	return nil
+}
+
+func (s *Shard) Sub(subscriber bus.Subscriber) error {
+	return subscriber.Subscribe(storage.TraceRaw, s)
 }
 
-func (s Shard) Close() error {
-	s.log.Info("closed")
+func (s *Shard) PreRun() error {
+	s.log = logger.GetLogger("shard")
+	s.log.Info("pre running")
 	return nil
 }
+
+func (s *Shard) Name() string {
+	return "shard"
+}
+
+func (s Shard) Rev(message bus.Message) {
+	s.log.Info("rev", logger.Any("msg", message.Data()))
+	_ = s.publisher.Publish(storage.TraceSharded, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "sharded message"))
+}
diff --git a/banyand/storage/pipeline.go b/banyand/storage/pipeline.go
index eb30eab..6f51c84 100644
--- a/banyand/storage/pipeline.go
+++ b/banyand/storage/pipeline.go
@@ -17,9 +17,70 @@
 
 package storage
 
+import (
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
+)
+
 const (
 	TraceRaw     = "trace-raw"
 	TraceSharded = "trace-sharded"
 	TraceIndex   = "trace-index"
 	TraceData    = "trace-data"
 )
+
+const name = "storage-engine"
+
+type Component interface {
+	ComponentName() string
+}
+
+type DataSubscriber interface {
+	Component
+	Sub(subscriber bus.Subscriber) error
+}
+
+type DataPublisher interface {
+	Component
+	Pub(publisher bus.Publisher) error
+}
+
+var _ run.PreRunner = (*Pipeline)(nil)
+
+type Pipeline struct {
+	logger  *logger.Logger
+	dataBus *bus.Bus
+	dps     []DataPublisher
+	dss     []DataSubscriber
+}
+
+func (e Pipeline) Name() string {
+	return name
+}
+
+func (e *Pipeline) PreRun() error {
+	e.logger = logger.GetLogger(name)
+	var err error
+	e.dataBus = bus.NewBus()
+	for _, dp := range e.dps {
+		err = multierr.Append(err, dp.Pub(e.dataBus))
+	}
+	for _, ds := range e.dss {
+		err = multierr.Append(err, ds.Sub(e.dataBus))
+	}
+	return err
+}
+
+func (e *Pipeline) Register(component ...Component) {
+	for _, c := range component {
+		if ds, ok := c.(DataSubscriber); ok {
+			e.dss = append(e.dss, ds)
+		}
+		if ps, ok := c.(DataPublisher); ok {
+			e.dps = append(e.dps, ps)
+		}
+	}
+}
diff --git a/go.mod b/go.mod
index fd684ee..0927f71 100644
--- a/go.mod
+++ b/go.mod
@@ -3,10 +3,11 @@ module github.com/apache/skywalking-banyandb
 go 1.16
 
 require (
+	github.com/oklog/run v1.1.0
 	github.com/spf13/cobra v1.1.3
+	github.com/spf13/pflag v1.0.5
 	github.com/spf13/viper v1.7.1
 	github.com/stretchr/testify v1.4.0
-	go.uber.org/atomic v1.7.0
 	go.uber.org/multierr v1.6.0
 	go.uber.org/zap v1.16.0
 )
diff --git a/go.sum b/go.sum
index 5d102e4..e426a98 100644
--- a/go.sum
+++ b/go.sum
@@ -128,6 +128,8 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
+github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
 github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
 github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 1429a5a..8616612 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -25,31 +25,7 @@ import (
 // Logger is wrapper for zap logger with module, it is singleton.
 type Logger struct {
 	module string
-	logger *zap.Logger
-}
-
-// Debug logs a message at DebugLevel. The message includes any fields passed
-// at the log site, as well as any fields accumulated on the logger.
-func (l *Logger) Debug(msg string, fields ...zap.Field) {
-	l.logger.Debug(msg, fields...)
-}
-
-// Info logs a message at InfoLevel. The message includes any fields passed
-// at the log site, as well as any fields accumulated on the logger.
-func (l *Logger) Info(msg string, fields ...zap.Field) {
-	l.logger.Info(msg, fields...)
-}
-
-// Warn logs a message at WarnLevel. The message includes any fields passed
-// at the log site, as well as any fields accumulated on the logger.
-func (l *Logger) Warn(msg string, fields ...zap.Field) {
-	l.logger.Warn(msg, fields...)
-}
-
-// Error logs a message at ErrorLevel. The message includes any fields passed
-// at the log site, as well as any fields accumulated on the logger.
-func (l *Logger) Error(msg string, fields ...zap.Field) {
-	l.logger.Error(msg, fields...)
+	*zap.Logger
 }
 
 // String constructs a field with the given key and value.
diff --git a/pkg/logger/setting.go b/pkg/logger/setting.go
index 5fb7dbd..bae8827 100644
--- a/pkg/logger/setting.go
+++ b/pkg/logger/setting.go
@@ -38,7 +38,7 @@ func GetLogger(scope ...string) *Logger {
 		return root
 	}
 	module := strings.Join(scope, ".")
-	return &Logger{module: module, logger: root.logger.Named(module)}
+	return &Logger{module: module, Logger: root.Logger.Named(module)}
 }
 
 // InitLogger initializes a zap logger from user config
@@ -73,5 +73,5 @@ func getLogger(cfg config.Logging) (*Logger, error) {
 	if err != nil {
 		return nil, err
 	}
-	return &Logger{module: "root", logger: l}, nil
+	return &Logger{module: "root", Logger: l}, nil
 }
diff --git a/pkg/logger/setting_test.go b/pkg/logger/setting_test.go
index d065af2..784ef4a 100644
--- a/pkg/logger/setting_test.go
+++ b/pkg/logger/setting_test.go
@@ -81,10 +81,10 @@ func TestInitLogger(t *testing.T) {
 			}
 			if err == nil {
 				assert.NotNil(t, logger)
-				assert.NotNil(t, logger.logger)
+				assert.NotNil(t, logger.Logger)
 				assert.NotEmpty(t, logger.module)
-				assert.Equal(t, tt.want.isDev, reflect.ValueOf(*logger.logger).FieldByName("development").Bool())
-				assert.NotNil(t, logger.logger.Check(tt.want.level, "foo"))
+				assert.Equal(t, tt.want.isDev, reflect.ValueOf(*logger.Logger).FieldByName("development").Bool())
+				assert.NotNil(t, logger.Logger.Check(tt.want.level, "foo"))
 			}
 		})
 	}
diff --git a/pkg/run/run.go b/pkg/run/run.go
new file mode 100644
index 0000000..ccee525
--- /dev/null
+++ b/pkg/run/run.go
@@ -0,0 +1,391 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package run
+
+import (
+	"fmt"
+	"os"
+	"path"
+
+	"github.com/oklog/run"
+	"github.com/spf13/pflag"
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// FlagSet holds a pflag.FlagSet as well as an exported Name variable for
+// allowing improved help usage information.
+type FlagSet struct {
+	*pflag.FlagSet
+	Name string
+}
+
+// NewFlagSet returns a new FlagSet for usage in Config objects.
+func NewFlagSet(name string) *FlagSet {
+	return &FlagSet{
+		FlagSet: pflag.NewFlagSet(name, pflag.ContinueOnError),
+		Name:    name,
+	}
+}
+
+// Unit is the default interface an object needs to implement for it to be able
+// to register with a Group.
+// Name should return a short but good identifier of the Unit.
+type Unit interface {
+	Name() string
+}
+
+// Config interface should be implemented by Group Unit objects that manage
+// their own configuration through the use of flags.
+// If a Unit's Validate returns an error it will stop the Group immediately.
+type Config interface {
+	//Unit for Group registration and identification
+	Unit
+	//FlagSet returns an object's FlagSet
+	FlagSet() *FlagSet
+	//Validate checks an object's stored values
+	Validate() error
+}
+
+// PreRunner interface should be implemented by Group Unit objects that need
+// a pre run stage before starting the Group Services.
+// If a Unit's PreRun returns an error it will stop the Group immediately.
+type PreRunner interface {
+	//Unit for Group registration and identification
+	Unit
+	PreRun() error
+}
+
+// NewPreRunner takes a name and a standalone pre runner compatible function
+// and turns them into a Group compatible PreRunner, ready for registration.
+func NewPreRunner(name string, fn func() error) PreRunner {
+	return preRunner{name: name, fn: fn}
+}
+
+type preRunner struct {
+	name string
+	fn   func() error
+}
+
+func (p preRunner) Name() string {
+	return p.name
+}
+
+func (p preRunner) PreRun() error {
+	return p.fn()
+}
+
+// Service interface should be implemented by Group Unit objects that need
+// to run a blocking service until an error occurs or a shutdown request is
+// made.
+// The Serve method must be blocking and return an error on unexpected shutdown.
+// Recoverable errors need to be handled inside the service itself.
+// GracefulStop must gracefully stop the service and make the Serve call return.
+//
+// Since Service is managed by Group, it is considered a design flaw to call any
+// of the Service methods directly in application code.
+type Service interface {
+	// Unit for Group registration and identification
+	Unit
+	// Serve starts the GroupService and blocks.
+	Serve() error
+	// GracefulStop shuts down and cleans up the GroupService.
+	GracefulStop()
+}
+
+// Group builds on https://github.com/oklog/run to provide a deterministic way
+// to manage service lifecycles. It allows for easy composition of elegant
+// monoliths as well as adding signal handlers, metrics services, etc.
+type Group struct {
+	// Name of the Group managed service. If omitted, the binaryname will be
+	// used as found at runtime.
+	Name string
+
+	f   *FlagSet
+	r   run.Group
+	c   []Config
+	p   []PreRunner
+	s   []Service
+	log *logger.Logger
+
+	configured   bool
+	hsRegistered bool
+}
+
+// Register will inspect the provided objects implementing the Unit interface to
+// see if it needs to register the objects for any of the Group bootstrap
+// phases. If a Unit doesn't satisfy any of the bootstrap phases it is ignored
+// by Group.
+// The returned array of booleans is of the same size as the amount of provided
+// Units, signalling for each provided Unit if it successfully registered with
+// Group for at least one of the bootstrap phases or if it was ignored.
+func (g *Group) Register(units ...Unit) []bool {
+	hasRegistered := make([]bool, len(units))
+	for idx := range units {
+		if !g.configured {
+			// if RunConfig has been called we can no longer register Config
+			// phases of Units
+			if c, ok := units[idx].(Config); ok {
+				g.c = append(g.c, c)
+				hasRegistered[idx] = true
+			}
+		}
+		if p, ok := units[idx].(PreRunner); ok {
+			g.p = append(g.p, p)
+			hasRegistered[idx] = true
+		}
+		if s, ok := units[idx].(Service); ok {
+			g.s = append(g.s, s)
+			hasRegistered[idx] = true
+		}
+	}
+	return hasRegistered
+}
+
+// RunConfig runs the Config phase of all registered Config aware Units.
+// Only use this function if needing to add additional wiring between config
+// and (pre)run phases and a separate PreRunner phase is not an option.
+// In most cases it is best to use the Run method directly as it will run the
+// Config phase prior to executing the PreRunner and Service phases.
+// If an error is returned the application must shut down as it is considered
+// fatal.
+func (g *Group) RunConfig(args ...string) (err error) {
+
+	g.configured = true
+
+	if g.Name == "" {
+		// use the binary name if custom name has not been provided
+		g.Name = path.Base(os.Args[0])
+	}
+
+	g.log = logger.GetLogger(g.Name)
+
+	defer func() {
+		if err != nil {
+			g.log.Error("unexpected exit", logger.Error(err))
+		}
+	}()
+
+	// run configuration stage
+	g.f = NewFlagSet(g.Name)
+	g.f.SortFlags = false // keep order of flag registration
+	g.f.Usage = func() {
+		fmt.Printf("Flags:\n")
+		g.f.PrintDefaults()
+	}
+
+	// register default rungroup flags
+	var (
+		name         string
+		showVersion  bool
+		showRunGroup bool
+	)
+
+	gFS := NewFlagSet("Common Service options")
+	gFS.SortFlags = false
+	gFS.StringVarP(&name, "name", "n", g.Name, `name of this service`)
+	gFS.BoolVarP(&showVersion, "version", "v", false,
+		"show version information and exit.")
+	gFS.BoolVar(&showRunGroup, "show-rungroup-units", false, "show rungroup units")
+	g.f.AddFlagSet(gFS.FlagSet)
+
+	// default to os.Args if args parameter was omitted
+	if len(args) == 0 {
+		args = os.Args[1:]
+	}
+
+	// parse our rungroup flags only (not the plugin ones)
+	_ = gFS.Parse(args)
+	if name != "" {
+		g.Name = name
+	}
+
+	// register flags from attached Config objects
+	fs := make([]*FlagSet, len(g.c))
+	for idx := range g.c {
+		// a Namer might have been deregistered
+		if g.c[idx] == nil {
+			continue
+		}
+		nameField := logger.String("name", g.c[idx].Name())
+		indexField := logger.Uint32("index", uint32(idx))
+		g.log.Debug("register flags", nameField, indexField,
+			logger.Uint32("total", uint32(len(g.c))))
+		fs[idx] = g.c[idx].FlagSet()
+		if fs[idx] == nil {
+			// no FlagSet returned
+			g.log.Debug("config object did not return a flagset", nameField)
+			continue
+		}
+		fs[idx].VisitAll(func(f *pflag.Flag) {
+			if g.f.Lookup(f.Name) != nil {
+				// log duplicate flag
+				g.log.Warn("ignoring duplicate flag", logger.String("name", f.Name), indexField)
+				return
+			}
+			g.f.AddFlag(f)
+		})
+	}
+
+	// parse FlagSet and exit on error
+	if err = g.f.Parse(args); err != nil {
+		return err
+	}
+
+	// bail early on help or version requests
+	switch {
+	case showRunGroup:
+		fmt.Println(g.ListUnits())
+		return nil
+	}
+
+	// Validate Config inputs
+	for idx := range g.c {
+		// a Config might have been deregistered during Run
+		indexField := logger.Uint32("index", uint32(idx))
+		if g.c[idx] == nil {
+			g.log.Debug("skipping validate", indexField)
+			continue
+		}
+		g.log.Debug("validate config: %s (%d/%d)", logger.String("name", g.c[idx].Name()), indexField,
+			logger.Uint32("total", uint32(len(g.c))))
+		if vErr := g.c[idx].Validate(); vErr != nil {
+			err = multierr.Append(err, vErr)
+		}
+	}
+
+	// exit on at least one Validate error
+	if err != nil {
+		return err
+	}
+
+	// log binary name and version
+	g.log.Info("started")
+
+	return nil
+}
+
+// Run will execute all phases of all registered Units and block until an error
+// occurs.
+// If RunConfig has been called prior to Run, the Group's Config phase will be
+// skipped and Run continues with the PreRunner and Service phases.
+//
+// The following phases are executed in the following sequence:
+//
+//   Config phase (serially, in order of Unit registration)
+//     - FlagSet()        Get & register all FlagSets from Config Units.
+//     - Flag Parsing     Using the provided args (os.Args if empty)
+//     - Validate()       Validate Config Units. Exit on first error.
+//
+//   PreRunner phase (serially, in order of Unit registration)
+//     - PreRun()         Execute PreRunner Units. Exit on first error.
+//
+//   Service phase (concurrently)
+//     - Serve()          Execute all Service Units in separate Go routines.
+//     - Wait             Block until one of the Serve() methods returns
+//     - GracefulStop()   Call interrupt handlers of all Service Units.
+//
+//   Run will return with the originating error on:
+//   - first Config.Validate()  returning an error
+//   - first PreRunner.PreRun() returning an error
+//   - first Service.Serve()    returning (error or nil)
+//
+func (g *Group) Run(args ...string) (err error) {
+	// run config registration and flag parsing stages
+	if err = g.RunConfig(args...); err != nil {
+		return err
+	}
+	defer func() {
+		if err != nil {
+			g.log.Error("unexpected exit", logger.Error(err))
+		}
+	}()
+
+	// execute pre run stage and exit on error
+	for idx := range g.p {
+		// a PreRunner might have been deregistered during Run
+		if g.p[idx] == nil {
+			continue
+		}
+		g.log.Debug("pre-run:", logger.String("name", g.p[idx].Name()),
+			logger.Uint32("ran", uint32(idx+1)), logger.Uint32("total", uint32(len(g.p))))
+		if err := g.p[idx].PreRun(); err != nil {
+			return err
+		}
+	}
+
+	// feed our registered services to our internal run.Group
+	for idx := range g.s {
+		// a Service might have been deregistered during Run
+		s := g.s[idx]
+		if s == nil {
+			continue
+		}
+
+		g.log.Debug("serve:", logger.String("name", s.Name()),
+			logger.Uint32("ran", uint32(idx+1)), logger.Uint32("total", uint32(len(g.s))))
+		g.r.Add(func() error {
+			return s.Serve()
+		}, func(_ error) {
+			g.log.Debug("stop:", logger.String("name", s.Name()),
+				logger.Uint32("ran", uint32(idx+1)), logger.Uint32("total", uint32(len(g.s))))
+			s.GracefulStop()
+		})
+	}
+
+	// start registered services and block
+	return g.r.Run()
+}
+
+// ListUnits returns a list of all Group phases and the Units registered to each
+// of them.
+func (g Group) ListUnits() string {
+	var (
+		s string
+		t = "cli"
+	)
+
+	if len(g.c) > 0 {
+		s += "\n- config: "
+		for _, u := range g.c {
+			if u != nil {
+				s += u.Name() + " "
+			}
+		}
+	}
+	if len(g.p) > 0 {
+		s += "\n- prerun: "
+		for _, u := range g.p {
+			if u != nil {
+				s += u.Name() + " "
+			}
+		}
+	}
+	if len(g.s) > 0 {
+		s += "\n- serve : "
+		for _, u := range g.s {
+			if u != nil {
+				t = "svc"
+				s += u.Name() + " "
+			}
+		}
+	}
+
+	return fmt.Sprintf("Group: %s [%s]%s", g.Name, t, s)
+}
diff --git a/pkg/signal/handler.go b/pkg/signal/handler.go
new file mode 100644
index 0000000..df3d7a3
--- /dev/null
+++ b/pkg/signal/handler.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package signal
+
+import (
+	"errors"
+	"fmt"
+	"os"
+	"os/signal"
+	"syscall"
+
+	"github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// ErrSignal is returned when a termination signal is received.
+var ErrSignal = errors.New("signal received")
+var _ run.Service = (*Handler)(nil)
+
+// Handler implements a unix signal handler as run.GroupService.
+type Handler struct {
+	signal chan os.Signal
+	cancel chan struct{}
+}
+
+func (h *Handler) Name() string {
+	return "signal"
+}
+
+// PreRun implements run.PreRunner to initialize the handler.
+func (h *Handler) PreRun() error {
+	h.cancel = make(chan struct{})
+	h.signal = make(chan os.Signal, 1)
+	signal.Notify(h.signal,
+		syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
+	return nil
+}
+
+// Serve implements run.Service and listens for incoming unix signals.
+func (h *Handler) Serve() error {
+	for {
+		select {
+		case sig := <-h.signal:
+			return fmt.Errorf("%s %w", sig, ErrSignal)
+		case <-h.cancel:
+			signal.Stop(h.signal)
+			close(h.signal)
+			return nil
+		}
+	}
+}
+
+// GracefulStop implements run.GroupService and will close the signal handler.
+func (h *Handler) GracefulStop() {
+	close(h.cancel)
+}
diff --git a/pkg/version/version.go b/pkg/version/version.go
index 8ffaaf0..10dc5cb 100644
--- a/pkg/version/version.go
+++ b/pkg/version/version.go
@@ -19,6 +19,11 @@
 // git branches and tags into the binary importing this package.
 package version
 
+import (
+	"fmt"
+	"strings"
+)
+
 // build is to be populated at build time using -ldflags -X.
 var build string
 
@@ -26,3 +31,36 @@ var build string
 func Build() string {
 	return build
 }
+
+// Show the service's version information
+func Show(serviceName string) {
+	fmt.Println(serviceName + " " + Parse())
+}
+
+// Parse returns the parsed service's version information. (from raw git label)
+func Parse() string {
+	// versionString syntax:
+	//   <release tag>-<commits since release tag>-g<commit hash>-<branch name>
+	v := strings.SplitN(build, "-", 4)
+	// prefix v on semantic versioning tags omitting it
+	// Go module tags should include the 'v'
+	if len(v[0]) > 1 && strings.ToLower(v[0])[0] != 'v' {
+		v[0] = "v" + v[0]
+	}
+	switch {
+	case len(v) != 4:
+		// built without using the make tooling
+		return "v0.0.0-unofficial"
+	case v[1] != "0":
+		// built from a non release commit point
+		// In the version string, the commit tag is prefixed with "-g" (which stands for "git").
+		// When printing the version string, remove that prefix to just show the real commit hash.
+		return fmt.Sprintf("%s-%s (%s, +%s)", v[0], v[3], v[2][1:], v[1])
+	case v[3] != "master":
+		// specific branch release build
+		return fmt.Sprintf("%s-%s", v[0], v[3])
+	default:
+		// master release build
+		return v[0]
+	}
+}