You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/04/20 06:19:59 UTC

[skywalking-banyandb] branch main updated: Improve config system

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 8df75ef  Improve config system
     new cf0b607  Merge branch 'main' of github.com:apache/skywalking-banyandb into main
8df75ef is described below

commit 8df75ef3af1f4621e0ab0ee003afa3c56af952ac
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Apr 20 14:16:42 2021 +0800

    Improve config system
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/config/default/standalone.yaml |  20 ------
 banyand/config/standalone.go           |  45 --------------
 banyand/internal/cmd/standalone.go     |  70 ++++++++++-----------
 banyand/storage/pipeline.go            |  15 ++++-
 pkg/config/config.go                   |  81 +++++++++++++++++++++----
 pkg/config/logging.go                  |  23 -------
 pkg/logger/logger.go                   |   6 ++
 pkg/logger/setting.go                  |  22 +++++--
 pkg/run/run.go                         | 107 ++++++++++++++-------------------
 9 files changed, 188 insertions(+), 201 deletions(-)

diff --git a/banyand/config/default/standalone.yaml b/banyand/config/default/standalone.yaml
deleted file mode 100644
index 33d0cc6..0000000
--- a/banyand/config/default/standalone.yaml
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to Apache Software Foundation (ASF) under one or more contributor
-# license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright
-# ownership. Apache Software Foundation (ASF) licenses this file to you under
-# the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-logging:
-  env: dev
-  level: info
diff --git a/banyand/config/standalone.go b/banyand/config/standalone.go
deleted file mode 100644
index 5b5c502..0000000
--- a/banyand/config/standalone.go
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package config
-
-import (
-	_ "embed"
-	"fmt"
-
-	"github.com/apache/skywalking-banyandb/pkg/config"
-)
-
-//go:embed default/standalone.yaml
-var standaloneDefault string
-
-type Standalone struct {
-	Logging config.Logging
-}
-
-func Load() (Standalone, error) {
-	var c *config.Config
-	var err error
-	var s Standalone
-	if c, err = config.NewConfig(standaloneDefault); err != nil {
-		return s, fmt.Errorf("failed to initialize config system:%v", err)
-	}
-	if err = c.Unmarshal(&s); err != nil {
-		return s, fmt.Errorf("failed to unmarshal config to standalone config")
-	}
-	return s, nil
-}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 8e0ea44..60653cc 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -21,13 +21,14 @@ import (
 	"os"
 
 	"github.com/spf13/cobra"
+	"go.uber.org/zap"
 
-	"github.com/apache/skywalking-banyandb/banyand/config"
 	executor2 "github.com/apache/skywalking-banyandb/banyand/executor"
 	index2 "github.com/apache/skywalking-banyandb/banyand/index"
 	series2 "github.com/apache/skywalking-banyandb/banyand/series"
 	shard2 "github.com/apache/skywalking-banyandb/banyand/shard"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
+	"github.com/apache/skywalking-banyandb/pkg/config"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 	"github.com/apache/skywalking-banyandb/pkg/signal"
@@ -35,58 +36,59 @@ import (
 )
 
 var (
-	standAloneConfig config.Standalone
-	g                = run.Group{Name: "standalone"}
+	g = run.Group{Name: "standalone"}
 )
 
 func newStandaloneCmd() *cobra.Command {
+	_ = logger.Bootstrap()
+	engine := new(storage.Pipeline)
+	shard := new(shard2.Shard)
+	executor := new(executor2.Executor)
+	index := new(index2.Index)
+	series := new(series2.Series)
+
+	// Register the storage engine components.
+	engine.Register(
+		shard,
+		executor,
+		index,
+		series,
+	)
+
+	// Register the run Group units.
+	g.Register(
+		new(signal.Handler),
+		engine,
+		shard,
+		executor,
+		index,
+		series,
+	)
+	logging := logger.Logging{}
 	standaloneCmd := &cobra.Command{
 		Use:     "standalone",
 		Version: version.Build(),
 		Short:   "Run as the standalone mode",
 		PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) {
-			if standAloneConfig, err = config.Load(); err != nil {
-				return err
-			}
-			if err = logger.InitLogger(standAloneConfig.Logging); err != nil {
+			if err = config.Load("logging", cmd.Flags()); err != nil {
 				return err
 			}
-			return nil
+			return logger.Init(logging)
 		},
 		RunE: func(cmd *cobra.Command, args []string) (err error) {
 			logger.GetLogger().Info("starting as a standalone server")
-			engine := new(storage.Pipeline)
-			shard := new(shard2.Shard)
-			executor := new(executor2.Executor)
-			index := new(index2.Index)
-			series := new(series2.Series)
-
-			// Register the storage engine components.
-			engine.Register(
-				shard,
-				executor,
-				index,
-				series,
-			)
-
-			// Register the run Group units.
-			g.Register(
-				new(signal.Handler),
-				engine,
-				shard,
-				executor,
-				index,
-				series,
-			)
-
 			// Spawn our go routines and wait for shutdown.
-			if err := g.Run(args...); err != nil {
-				logger.GetLogger().Error("exit: ", logger.String("name", g.Name), logger.Error(err))
+			if err := g.Run(); err != nil {
+				logger.GetLogger().WithOptions(zap.AddStacktrace(zap.FatalLevel)).
+					Error("exit: ", logger.String("name", g.Name), logger.Error(err))
 				os.Exit(-1)
 			}
 			return nil
 		},
 	}
 
+	standaloneCmd.Flags().StringVarP(&logging.Env, "logging.env", "", "dev", "the logging")
+	standaloneCmd.Flags().StringVarP(&logging.Level, "logging.level", "", "debug", "the level of logging")
+	standaloneCmd.Flags().AddFlagSet(g.RegisterFlags().FlagSet)
 	return standaloneCmd
 }
diff --git a/banyand/storage/pipeline.go b/banyand/storage/pipeline.go
index 6f51c84..494f34f 100644
--- a/banyand/storage/pipeline.go
+++ b/banyand/storage/pipeline.go
@@ -49,20 +49,33 @@ type DataPublisher interface {
 }
 
 var _ run.PreRunner = (*Pipeline)(nil)
+var _ run.Config = (*Pipeline)(nil)
 
 type Pipeline struct {
 	logger  *logger.Logger
+	test    string
 	dataBus *bus.Bus
 	dps     []DataPublisher
 	dss     []DataSubscriber
 }
 
+func (e *Pipeline) FlagSet() *run.FlagSet {
+	e.logger = logger.GetLogger(name)
+	fs := run.NewFlagSet("storage")
+	fs.StringVarP(&e.test, "storage.test", "", "a", "test config")
+	return fs
+}
+
+func (e *Pipeline) Validate() error {
+	e.logger.Info("test", logger.String("val", e.test))
+	return nil
+}
+
 func (e Pipeline) Name() string {
 	return name
 }
 
 func (e *Pipeline) PreRun() error {
-	e.logger = logger.GetLogger(name)
 	var err error
 	e.dataBus = bus.NewBus()
 	for _, dp := range e.dps {
diff --git a/pkg/config/config.go b/pkg/config/config.go
index b6d8f8d..1c71ce2 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -21,26 +21,83 @@ import (
 	"fmt"
 	"strings"
 
+	"github.com/spf13/pflag"
 	"github.com/spf13/viper"
+	"go.uber.org/multierr"
 )
 
-type Config struct {
-	viper *viper.Viper
-}
+const (
+	// The environment variable prefix of all environment variables bound to our command line flags.
+	envPrefix = "BYDB"
+)
 
-func (c *Config) Unmarshal(config interface{}) error {
-	return c.viper.Unmarshal(config)
+type config struct {
+	name  string
+	viper *viper.Viper
 }
 
-func NewConfig(defaultConfig string) (*Config, error) {
-	c := new(Config)
+func Load(name string, fs *pflag.FlagSet) error {
+	c := new(config)
 	v := viper.New()
+	c.name = name
 	c.viper = v
-	v.SetConfigType("yaml")
-	if err := v.ReadConfig(strings.NewReader(defaultConfig)); err != nil {
-		return nil, fmt.Errorf("failed to read config entries from default config files: %v", err)
+	if err := c.initializeConfig(fs); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (c *config) initializeConfig(fs *pflag.FlagSet) error {
+	v := c.viper
+
+	// Set the base name of the config file, without the file extension.
+	v.SetConfigName(c.name)
+
+	// Set as many paths as you like where viper should look for the
+	// config file. We are only looking in the current working directory.
+	v.AddConfigPath(".")
+
+	// Attempt to read the config file, gracefully ignoring errors
+	// caused by a config file not being found. Return an error
+	// if we cannot parse the config file.
+	if err := v.ReadInConfig(); err != nil {
+		// It's okay if there isn't a config file
+		if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
+			return err
+		}
 	}
+
+	// When we bind flags to environment variables expect that the
+	// environment variables are prefixed, e.g. a flag like --number
+	// binds to an environment variable STING_NUMBER. This helps
+	// avoid conflicts.
+	v.SetEnvPrefix(envPrefix)
+
+	// Bind to environment variables
+	// Works great for simple config names, but needs help for names
+	// like --favorite-color which we fix in the bindFlags function
 	v.AutomaticEnv()
-	v.SetEnvPrefix("BYDB")
-	return c, nil
+
+	// Bind the current command's flags to viper
+	return bindFlags(fs, v)
+}
+
+// Bind each cobra flag to its associated viper configuration (config file and environment variable)
+func bindFlags(fs *pflag.FlagSet, v *viper.Viper) error {
+	var err error
+	fs.VisitAll(func(f *pflag.Flag) {
+		// Environment variables can't have dashes in them, so bind them to their equivalent
+		// keys with underscores.
+		if strings.Contains(f.Name, ".") {
+			envVarSuffix := strings.ToUpper(strings.ReplaceAll(f.Name, ".", "_"))
+			err = multierr.Append(err, v.BindEnv(f.Name, fmt.Sprintf("%s_%s", envPrefix, envVarSuffix)))
+		}
+
+		// Apply the viper config value to the flag when the flag is not set and viper has a value
+		if !f.Changed && v.IsSet(f.Name) {
+			val := v.Get(f.Name)
+			err = multierr.Append(err, fs.Set(f.Name, fmt.Sprintf("%v", val)))
+		}
+	})
+	return err
 }
diff --git a/pkg/config/logging.go b/pkg/config/logging.go
deleted file mode 100644
index 0ddb698..0000000
--- a/pkg/config/logging.go
+++ /dev/null
@@ -1,23 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package config
-
-type Logging struct {
-	Env   string
-	Level string
-}
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 8616612..8195056 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -22,6 +22,12 @@ import (
 	"go.uber.org/zap/zapcore"
 )
 
+// Logging is the config info
+type Logging struct {
+	Env   string
+	Level string
+}
+
 // Logger is wrapper for zap logger with module, it is singleton.
 type Logger struct {
 	module string
diff --git a/pkg/logger/setting.go b/pkg/logger/setting.go
index bae8827..658b928 100644
--- a/pkg/logger/setting.go
+++ b/pkg/logger/setting.go
@@ -23,8 +23,6 @@ import (
 
 	"go.uber.org/zap"
 	"go.uber.org/zap/zapcore"
-
-	"github.com/apache/skywalking-banyandb/pkg/config"
 )
 
 var (
@@ -41,8 +39,22 @@ func GetLogger(scope ...string) *Logger {
 	return &Logger{module: module, Logger: root.Logger.Named(module)}
 }
 
-// InitLogger initializes a zap logger from user config
-func InitLogger(cfg config.Logging) (err error) {
+// Bootstrap logging for system boot
+func Bootstrap() (err error) {
+	once.Do(func() {
+		root, err = getLogger(Logging{
+			Env:   "dev",
+			Level: "debug",
+		})
+	})
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// Init initializes a zap logger from user config
+func Init(cfg Logging) (err error) {
 	once.Do(func() {
 		root, err = getLogger(cfg)
 	})
@@ -53,7 +65,7 @@ func InitLogger(cfg config.Logging) (err error) {
 }
 
 // getLogger initializes a root logger
-func getLogger(cfg config.Logging) (*Logger, error) {
+func getLogger(cfg Logging) (*Logger, error) {
 	// parse logging level
 	level := zap.NewAtomicLevelAt(zapcore.InfoLevel)
 	if err := level.UnmarshalText([]byte(cfg.Level)); err != nil {
diff --git a/pkg/run/run.go b/pkg/run/run.go
index ccee525..7517657 100644
--- a/pkg/run/run.go
+++ b/pkg/run/run.go
@@ -25,7 +25,9 @@ import (
 	"github.com/oklog/run"
 	"github.com/spf13/pflag"
 	"go.uber.org/multierr"
+	"go.uber.org/zap"
 
+	"github.com/apache/skywalking-banyandb/pkg/config"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -124,8 +126,9 @@ type Group struct {
 	s   []Service
 	log *logger.Logger
 
-	configured   bool
-	hsRegistered bool
+	showRunGroup bool
+
+	configured bool
 }
 
 // Register will inspect the provided objects implementing the Unit interface to
@@ -136,6 +139,7 @@ type Group struct {
 // Units, signalling for each provided Unit if it successfully registered with
 // Group for at least one of the bootstrap phases or if it was ignored.
 func (g *Group) Register(units ...Unit) []bool {
+	g.log = logger.GetLogger(g.Name)
 	hasRegistered := make([]bool, len(units))
 	for idx := range units {
 		if !g.configured {
@@ -158,30 +162,7 @@ func (g *Group) Register(units ...Unit) []bool {
 	return hasRegistered
 }
 
-// RunConfig runs the Config phase of all registered Config aware Units.
-// Only use this function if needing to add additional wiring between config
-// and (pre)run phases and a separate PreRunner phase is not an option.
-// In most cases it is best to use the Run method directly as it will run the
-// Config phase prior to executing the PreRunner and Service phases.
-// If an error is returned the application must shut down as it is considered
-// fatal.
-func (g *Group) RunConfig(args ...string) (err error) {
-
-	g.configured = true
-
-	if g.Name == "" {
-		// use the binary name if custom name has not been provided
-		g.Name = path.Base(os.Args[0])
-	}
-
-	g.log = logger.GetLogger(g.Name)
-
-	defer func() {
-		if err != nil {
-			g.log.Error("unexpected exit", logger.Error(err))
-		}
-	}()
-
+func (g *Group) RegisterFlags() *FlagSet {
 	// run configuration stage
 	g.f = NewFlagSet(g.Name)
 	g.f.SortFlags = false // keep order of flag registration
@@ -190,32 +171,12 @@ func (g *Group) RunConfig(args ...string) (err error) {
 		g.f.PrintDefaults()
 	}
 
-	// register default rungroup flags
-	var (
-		name         string
-		showVersion  bool
-		showRunGroup bool
-	)
-
 	gFS := NewFlagSet("Common Service options")
 	gFS.SortFlags = false
-	gFS.StringVarP(&name, "name", "n", g.Name, `name of this service`)
-	gFS.BoolVarP(&showVersion, "version", "v", false,
-		"show version information and exit.")
-	gFS.BoolVar(&showRunGroup, "show-rungroup-units", false, "show rungroup units")
+	gFS.StringVarP(&g.Name, "name", "n", g.Name, `name of this service`)
+	gFS.BoolVar(&g.showRunGroup, "show-rungroup-units", false, "show rungroup units")
 	g.f.AddFlagSet(gFS.FlagSet)
 
-	// default to os.Args if args parameter was omitted
-	if len(args) == 0 {
-		args = os.Args[1:]
-	}
-
-	// parse our rungroup flags only (not the plugin ones)
-	_ = gFS.Parse(args)
-	if name != "" {
-		g.Name = name
-	}
-
 	// register flags from attached Config objects
 	fs := make([]*FlagSet, len(g.c))
 	for idx := range g.c {
@@ -224,7 +185,7 @@ func (g *Group) RunConfig(args ...string) (err error) {
 			continue
 		}
 		nameField := logger.String("name", g.c[idx].Name())
-		indexField := logger.Uint32("index", uint32(idx))
+		indexField := logger.Uint32("registered", uint32(idx+1))
 		g.log.Debug("register flags", nameField, indexField,
 			logger.Uint32("total", uint32(len(g.c))))
 		fs[idx] = g.c[idx].FlagSet()
@@ -242,28 +203,52 @@ func (g *Group) RunConfig(args ...string) (err error) {
 			g.f.AddFlag(f)
 		})
 	}
+	return g.f
+}
 
-	// parse FlagSet and exit on error
-	if err = g.f.Parse(args); err != nil {
-		return err
+// RunConfig runs the Config phase of all registered Config aware Units.
+// Only use this function if needing to add additional wiring between config
+// and (pre)run phases and a separate PreRunner phase is not an option.
+// In most cases it is best to use the Run method directly as it will run the
+// Config phase prior to executing the PreRunner and Service phases.
+// If an error is returned the application must shut down as it is considered
+// fatal.
+func (g *Group) RunConfig() (interrupted bool, err error) {
+	g.log = logger.GetLogger(g.Name)
+	g.configured = true
+
+	if g.Name == "" {
+		// use the binary name if custom name has not been provided
+		g.Name = path.Base(os.Args[0])
+	}
+
+	defer func() {
+		if err != nil {
+			g.log.Error("unexpected exit", logger.Error(err))
+		}
+	}()
+
+	// Load config from env and file
+	if err = config.Load(g.f.Name, g.f.FlagSet); err != nil {
+		return false, err
 	}
 
 	// bail early on help or version requests
 	switch {
-	case showRunGroup:
+	case g.showRunGroup:
 		fmt.Println(g.ListUnits())
-		return nil
+		return true, nil
 	}
 
 	// Validate Config inputs
 	for idx := range g.c {
 		// a Config might have been deregistered during Run
-		indexField := logger.Uint32("index", uint32(idx))
+		indexField := logger.Uint32("ran", uint32(idx+1))
 		if g.c[idx] == nil {
 			g.log.Debug("skipping validate", indexField)
 			continue
 		}
-		g.log.Debug("validate config: %s (%d/%d)", logger.String("name", g.c[idx].Name()), indexField,
+		g.log.Debug("validate config", logger.String("name", g.c[idx].Name()), indexField,
 			logger.Uint32("total", uint32(len(g.c))))
 		if vErr := g.c[idx].Validate(); vErr != nil {
 			err = multierr.Append(err, vErr)
@@ -272,13 +257,13 @@ func (g *Group) RunConfig(args ...string) (err error) {
 
 	// exit on at least one Validate error
 	if err != nil {
-		return err
+		return false, err
 	}
 
 	// log binary name and version
 	g.log.Info("started")
 
-	return nil
+	return false, nil
 }
 
 // Run will execute all phases of all registered Units and block until an error
@@ -306,14 +291,14 @@ func (g *Group) RunConfig(args ...string) (err error) {
 //   - first PreRunner.PreRun() returning an error
 //   - first Service.Serve()    returning (error or nil)
 //
-func (g *Group) Run(args ...string) (err error) {
+func (g *Group) Run() (err error) {
 	// run config registration and flag parsing stages
-	if err = g.RunConfig(args...); err != nil {
+	if interrupted, err := g.RunConfig(); interrupted || err != nil {
 		return err
 	}
 	defer func() {
 		if err != nil {
-			g.log.Error("unexpected exit", logger.Error(err))
+			g.log.WithOptions(zap.AddStacktrace(zap.FatalLevel)).Error("unexpected exit", logger.Error(err))
 		}
 	}()