You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/04/20 06:19:59 UTC
[skywalking-banyandb] branch main updated: Improve config system
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 8df75ef Improve config system
new cf0b607 Merge branch 'main' of github.com:apache/skywalking-banyandb into main
8df75ef is described below
commit 8df75ef3af1f4621e0ab0ee003afa3c56af952ac
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Apr 20 14:16:42 2021 +0800
Improve config system
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/config/default/standalone.yaml | 20 ------
banyand/config/standalone.go | 45 --------------
banyand/internal/cmd/standalone.go | 70 ++++++++++-----------
banyand/storage/pipeline.go | 15 ++++-
pkg/config/config.go | 81 +++++++++++++++++++++----
pkg/config/logging.go | 23 -------
pkg/logger/logger.go | 6 ++
pkg/logger/setting.go | 22 +++++--
pkg/run/run.go | 107 ++++++++++++++-------------------
9 files changed, 188 insertions(+), 201 deletions(-)
diff --git a/banyand/config/default/standalone.yaml b/banyand/config/default/standalone.yaml
deleted file mode 100644
index 33d0cc6..0000000
--- a/banyand/config/default/standalone.yaml
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to Apache Software Foundation (ASF) under one or more contributor
-# license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright
-# ownership. Apache Software Foundation (ASF) licenses this file to you under
-# the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-logging:
- env: dev
- level: info
diff --git a/banyand/config/standalone.go b/banyand/config/standalone.go
deleted file mode 100644
index 5b5c502..0000000
--- a/banyand/config/standalone.go
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package config
-
-import (
- _ "embed"
- "fmt"
-
- "github.com/apache/skywalking-banyandb/pkg/config"
-)
-
-//go:embed default/standalone.yaml
-var standaloneDefault string
-
-type Standalone struct {
- Logging config.Logging
-}
-
-func Load() (Standalone, error) {
- var c *config.Config
- var err error
- var s Standalone
- if c, err = config.NewConfig(standaloneDefault); err != nil {
- return s, fmt.Errorf("failed to initialize config system:%v", err)
- }
- if err = c.Unmarshal(&s); err != nil {
- return s, fmt.Errorf("failed to unmarshal config to standalone config")
- }
- return s, nil
-}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 8e0ea44..60653cc 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -21,13 +21,14 @@ import (
"os"
"github.com/spf13/cobra"
+ "go.uber.org/zap"
- "github.com/apache/skywalking-banyandb/banyand/config"
executor2 "github.com/apache/skywalking-banyandb/banyand/executor"
index2 "github.com/apache/skywalking-banyandb/banyand/index"
series2 "github.com/apache/skywalking-banyandb/banyand/series"
shard2 "github.com/apache/skywalking-banyandb/banyand/shard"
"github.com/apache/skywalking-banyandb/banyand/storage"
+ "github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/signal"
@@ -35,58 +36,59 @@ import (
)
var (
- standAloneConfig config.Standalone
- g = run.Group{Name: "standalone"}
+ g = run.Group{Name: "standalone"}
)
func newStandaloneCmd() *cobra.Command {
+ _ = logger.Bootstrap()
+ engine := new(storage.Pipeline)
+ shard := new(shard2.Shard)
+ executor := new(executor2.Executor)
+ index := new(index2.Index)
+ series := new(series2.Series)
+
+ // Register the storage engine components.
+ engine.Register(
+ shard,
+ executor,
+ index,
+ series,
+ )
+
+ // Register the run Group units.
+ g.Register(
+ new(signal.Handler),
+ engine,
+ shard,
+ executor,
+ index,
+ series,
+ )
+ logging := logger.Logging{}
standaloneCmd := &cobra.Command{
Use: "standalone",
Version: version.Build(),
Short: "Run as the standalone mode",
PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) {
- if standAloneConfig, err = config.Load(); err != nil {
- return err
- }
- if err = logger.InitLogger(standAloneConfig.Logging); err != nil {
+ if err = config.Load("logging", cmd.Flags()); err != nil {
return err
}
- return nil
+ return logger.Init(logging)
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
logger.GetLogger().Info("starting as a standalone server")
- engine := new(storage.Pipeline)
- shard := new(shard2.Shard)
- executor := new(executor2.Executor)
- index := new(index2.Index)
- series := new(series2.Series)
-
- // Register the storage engine components.
- engine.Register(
- shard,
- executor,
- index,
- series,
- )
-
- // Register the run Group units.
- g.Register(
- new(signal.Handler),
- engine,
- shard,
- executor,
- index,
- series,
- )
-
// Spawn our go routines and wait for shutdown.
- if err := g.Run(args...); err != nil {
- logger.GetLogger().Error("exit: ", logger.String("name", g.Name), logger.Error(err))
+ if err := g.Run(); err != nil {
+ logger.GetLogger().WithOptions(zap.AddStacktrace(zap.FatalLevel)).
+ Error("exit: ", logger.String("name", g.Name), logger.Error(err))
os.Exit(-1)
}
return nil
},
}
+ standaloneCmd.Flags().StringVarP(&logging.Env, "logging.env", "", "dev", "the logging")
+ standaloneCmd.Flags().StringVarP(&logging.Level, "logging.level", "", "debug", "the level of logging")
+ standaloneCmd.Flags().AddFlagSet(g.RegisterFlags().FlagSet)
return standaloneCmd
}
diff --git a/banyand/storage/pipeline.go b/banyand/storage/pipeline.go
index 6f51c84..494f34f 100644
--- a/banyand/storage/pipeline.go
+++ b/banyand/storage/pipeline.go
@@ -49,20 +49,33 @@ type DataPublisher interface {
}
var _ run.PreRunner = (*Pipeline)(nil)
+var _ run.Config = (*Pipeline)(nil)
type Pipeline struct {
logger *logger.Logger
+ test string
dataBus *bus.Bus
dps []DataPublisher
dss []DataSubscriber
}
+func (e *Pipeline) FlagSet() *run.FlagSet {
+ e.logger = logger.GetLogger(name)
+ fs := run.NewFlagSet("storage")
+ fs.StringVarP(&e.test, "storage.test", "", "a", "test config")
+ return fs
+}
+
+func (e *Pipeline) Validate() error {
+ e.logger.Info("test", logger.String("val", e.test))
+ return nil
+}
+
func (e Pipeline) Name() string {
return name
}
func (e *Pipeline) PreRun() error {
- e.logger = logger.GetLogger(name)
var err error
e.dataBus = bus.NewBus()
for _, dp := range e.dps {
diff --git a/pkg/config/config.go b/pkg/config/config.go
index b6d8f8d..1c71ce2 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -21,26 +21,83 @@ import (
"fmt"
"strings"
+ "github.com/spf13/pflag"
"github.com/spf13/viper"
+ "go.uber.org/multierr"
)
-type Config struct {
- viper *viper.Viper
-}
+const (
+ // The environment variable prefix of all environment variables bound to our command line flags.
+ envPrefix = "BYDB"
+)
-func (c *Config) Unmarshal(config interface{}) error {
- return c.viper.Unmarshal(config)
+type config struct {
+ name string
+ viper *viper.Viper
}
-func NewConfig(defaultConfig string) (*Config, error) {
- c := new(Config)
+func Load(name string, fs *pflag.FlagSet) error {
+ c := new(config)
v := viper.New()
+ c.name = name
c.viper = v
- v.SetConfigType("yaml")
- if err := v.ReadConfig(strings.NewReader(defaultConfig)); err != nil {
- return nil, fmt.Errorf("failed to read config entries from default config files: %v", err)
+ if err := c.initializeConfig(fs); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (c *config) initializeConfig(fs *pflag.FlagSet) error {
+ v := c.viper
+
+ // Set the base name of the config file, without the file extension.
+ v.SetConfigName(c.name)
+
+ // Set as many paths as you like where viper should look for the
+ // config file. We are only looking in the current working directory.
+ v.AddConfigPath(".")
+
+ // Attempt to read the config file, gracefully ignoring errors
+ // caused by a config file not being found. Return an error
+ // if we cannot parse the config file.
+ if err := v.ReadInConfig(); err != nil {
+ // It's okay if there isn't a config file
+ if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
+ return err
+ }
}
+
+ // When we bind flags to environment variables expect that the
+ // environment variables are prefixed, e.g. a flag like --number
+ // binds to an environment variable STING_NUMBER. This helps
+ // avoid conflicts.
+ v.SetEnvPrefix(envPrefix)
+
+ // Bind to environment variables
+ // Works great for simple config names, but needs help for names
+ // like --favorite-color which we fix in the bindFlags function
v.AutomaticEnv()
- v.SetEnvPrefix("BYDB")
- return c, nil
+
+ // Bind the current command's flags to viper
+ return bindFlags(fs, v)
+}
+
+// Bind each cobra flag to its associated viper configuration (config file and environment variable)
+func bindFlags(fs *pflag.FlagSet, v *viper.Viper) error {
+ var err error
+ fs.VisitAll(func(f *pflag.Flag) {
+ // Environment variables can't have dashes in them, so bind them to their equivalent
+ // keys with underscores.
+ if strings.Contains(f.Name, ".") {
+ envVarSuffix := strings.ToUpper(strings.ReplaceAll(f.Name, ".", "_"))
+ err = multierr.Append(err, v.BindEnv(f.Name, fmt.Sprintf("%s_%s", envPrefix, envVarSuffix)))
+ }
+
+ // Apply the viper config value to the flag when the flag is not set and viper has a value
+ if !f.Changed && v.IsSet(f.Name) {
+ val := v.Get(f.Name)
+ err = multierr.Append(err, fs.Set(f.Name, fmt.Sprintf("%v", val)))
+ }
+ })
+ return err
}
diff --git a/pkg/config/logging.go b/pkg/config/logging.go
deleted file mode 100644
index 0ddb698..0000000
--- a/pkg/config/logging.go
+++ /dev/null
@@ -1,23 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package config
-
-type Logging struct {
- Env string
- Level string
-}
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 8616612..8195056 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -22,6 +22,12 @@ import (
"go.uber.org/zap/zapcore"
)
+// Logging is the config info
+type Logging struct {
+ Env string
+ Level string
+}
+
// Logger is wrapper for zap logger with module, it is singleton.
type Logger struct {
module string
diff --git a/pkg/logger/setting.go b/pkg/logger/setting.go
index bae8827..658b928 100644
--- a/pkg/logger/setting.go
+++ b/pkg/logger/setting.go
@@ -23,8 +23,6 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
-
- "github.com/apache/skywalking-banyandb/pkg/config"
)
var (
@@ -41,8 +39,22 @@ func GetLogger(scope ...string) *Logger {
return &Logger{module: module, Logger: root.Logger.Named(module)}
}
-// InitLogger initializes a zap logger from user config
-func InitLogger(cfg config.Logging) (err error) {
+// Bootstrap logging for system boot
+func Bootstrap() (err error) {
+ once.Do(func() {
+ root, err = getLogger(Logging{
+ Env: "dev",
+ Level: "debug",
+ })
+ })
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// Init initializes a zap logger from user config
+func Init(cfg Logging) (err error) {
once.Do(func() {
root, err = getLogger(cfg)
})
@@ -53,7 +65,7 @@ func InitLogger(cfg config.Logging) (err error) {
}
// getLogger initializes a root logger
-func getLogger(cfg config.Logging) (*Logger, error) {
+func getLogger(cfg Logging) (*Logger, error) {
// parse logging level
level := zap.NewAtomicLevelAt(zapcore.InfoLevel)
if err := level.UnmarshalText([]byte(cfg.Level)); err != nil {
diff --git a/pkg/run/run.go b/pkg/run/run.go
index ccee525..7517657 100644
--- a/pkg/run/run.go
+++ b/pkg/run/run.go
@@ -25,7 +25,9 @@ import (
"github.com/oklog/run"
"github.com/spf13/pflag"
"go.uber.org/multierr"
+ "go.uber.org/zap"
+ "github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -124,8 +126,9 @@ type Group struct {
s []Service
log *logger.Logger
- configured bool
- hsRegistered bool
+ showRunGroup bool
+
+ configured bool
}
// Register will inspect the provided objects implementing the Unit interface to
@@ -136,6 +139,7 @@ type Group struct {
// Units, signalling for each provided Unit if it successfully registered with
// Group for at least one of the bootstrap phases or if it was ignored.
func (g *Group) Register(units ...Unit) []bool {
+ g.log = logger.GetLogger(g.Name)
hasRegistered := make([]bool, len(units))
for idx := range units {
if !g.configured {
@@ -158,30 +162,7 @@ func (g *Group) Register(units ...Unit) []bool {
return hasRegistered
}
-// RunConfig runs the Config phase of all registered Config aware Units.
-// Only use this function if needing to add additional wiring between config
-// and (pre)run phases and a separate PreRunner phase is not an option.
-// In most cases it is best to use the Run method directly as it will run the
-// Config phase prior to executing the PreRunner and Service phases.
-// If an error is returned the application must shut down as it is considered
-// fatal.
-func (g *Group) RunConfig(args ...string) (err error) {
-
- g.configured = true
-
- if g.Name == "" {
- // use the binary name if custom name has not been provided
- g.Name = path.Base(os.Args[0])
- }
-
- g.log = logger.GetLogger(g.Name)
-
- defer func() {
- if err != nil {
- g.log.Error("unexpected exit", logger.Error(err))
- }
- }()
-
+func (g *Group) RegisterFlags() *FlagSet {
// run configuration stage
g.f = NewFlagSet(g.Name)
g.f.SortFlags = false // keep order of flag registration
@@ -190,32 +171,12 @@ func (g *Group) RunConfig(args ...string) (err error) {
g.f.PrintDefaults()
}
- // register default rungroup flags
- var (
- name string
- showVersion bool
- showRunGroup bool
- )
-
gFS := NewFlagSet("Common Service options")
gFS.SortFlags = false
- gFS.StringVarP(&name, "name", "n", g.Name, `name of this service`)
- gFS.BoolVarP(&showVersion, "version", "v", false,
- "show version information and exit.")
- gFS.BoolVar(&showRunGroup, "show-rungroup-units", false, "show rungroup units")
+ gFS.StringVarP(&g.Name, "name", "n", g.Name, `name of this service`)
+ gFS.BoolVar(&g.showRunGroup, "show-rungroup-units", false, "show rungroup units")
g.f.AddFlagSet(gFS.FlagSet)
- // default to os.Args if args parameter was omitted
- if len(args) == 0 {
- args = os.Args[1:]
- }
-
- // parse our rungroup flags only (not the plugin ones)
- _ = gFS.Parse(args)
- if name != "" {
- g.Name = name
- }
-
// register flags from attached Config objects
fs := make([]*FlagSet, len(g.c))
for idx := range g.c {
@@ -224,7 +185,7 @@ func (g *Group) RunConfig(args ...string) (err error) {
continue
}
nameField := logger.String("name", g.c[idx].Name())
- indexField := logger.Uint32("index", uint32(idx))
+ indexField := logger.Uint32("registered", uint32(idx+1))
g.log.Debug("register flags", nameField, indexField,
logger.Uint32("total", uint32(len(g.c))))
fs[idx] = g.c[idx].FlagSet()
@@ -242,28 +203,52 @@ func (g *Group) RunConfig(args ...string) (err error) {
g.f.AddFlag(f)
})
}
+ return g.f
+}
- // parse FlagSet and exit on error
- if err = g.f.Parse(args); err != nil {
- return err
+// RunConfig runs the Config phase of all registered Config aware Units.
+// Only use this function if needing to add additional wiring between config
+// and (pre)run phases and a separate PreRunner phase is not an option.
+// In most cases it is best to use the Run method directly as it will run the
+// Config phase prior to executing the PreRunner and Service phases.
+// If an error is returned the application must shut down as it is considered
+// fatal.
+func (g *Group) RunConfig() (interrupted bool, err error) {
+ g.log = logger.GetLogger(g.Name)
+ g.configured = true
+
+ if g.Name == "" {
+ // use the binary name if custom name has not been provided
+ g.Name = path.Base(os.Args[0])
+ }
+
+ defer func() {
+ if err != nil {
+ g.log.Error("unexpected exit", logger.Error(err))
+ }
+ }()
+
+ // Load config from env and file
+ if err = config.Load(g.f.Name, g.f.FlagSet); err != nil {
+ return false, err
}
// bail early on help or version requests
switch {
- case showRunGroup:
+ case g.showRunGroup:
fmt.Println(g.ListUnits())
- return nil
+ return true, nil
}
// Validate Config inputs
for idx := range g.c {
// a Config might have been deregistered during Run
- indexField := logger.Uint32("index", uint32(idx))
+ indexField := logger.Uint32("ran", uint32(idx+1))
if g.c[idx] == nil {
g.log.Debug("skipping validate", indexField)
continue
}
- g.log.Debug("validate config: %s (%d/%d)", logger.String("name", g.c[idx].Name()), indexField,
+ g.log.Debug("validate config", logger.String("name", g.c[idx].Name()), indexField,
logger.Uint32("total", uint32(len(g.c))))
if vErr := g.c[idx].Validate(); vErr != nil {
err = multierr.Append(err, vErr)
@@ -272,13 +257,13 @@ func (g *Group) RunConfig(args ...string) (err error) {
// exit on at least one Validate error
if err != nil {
- return err
+ return false, err
}
// log binary name and version
g.log.Info("started")
- return nil
+ return false, nil
}
// Run will execute all phases of all registered Units and block until an error
@@ -306,14 +291,14 @@ func (g *Group) RunConfig(args ...string) (err error) {
// - first PreRunner.PreRun() returning an error
// - first Service.Serve() returning (error or nil)
//
-func (g *Group) Run(args ...string) (err error) {
+func (g *Group) Run() (err error) {
// run config registration and flag parsing stages
- if err = g.RunConfig(args...); err != nil {
+ if interrupted, err := g.RunConfig(); interrupted || err != nil {
return err
}
defer func() {
if err != nil {
- g.log.Error("unexpected exit", logger.Error(err))
+ g.log.WithOptions(zap.AddStacktrace(zap.FatalLevel)).Error("unexpected exit", logger.Error(err))
}
}()