You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2023/07/18 04:13:38 UTC
[dubbo-admin] 03/08: refactor, introduce component concept to admin
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch refactor-with-go-components-experimental
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git
commit 934c27f05a7ace7246d819b336f4b28c1095d52c
Author: chickenlj <ke...@gmail.com>
AuthorDate: Mon Jul 3 11:09:22 2023 +0800
refactor, introduce component concept to admin
---
.run/Admin.run.xml | 41 +--
README.md | 2 +-
README_ZH.md | 2 +-
cmd/admin/README.md | 2 +-
cmd/admin/cmd/root.go | 45 ++++
cmd/admin/cmd/run.go | 93 +++++++
cmd/admin/main.go | 28 +--
cmd/authority/app/authority.go | 84 +------
conf/{dubboadmin.yml => admin.yml} | 0
deploy/charts/dubbo-admin/templates/configmap.yaml | 85 +------
.../charts/dubbo-admin/templates/deployment.yaml | 3 +
deploy/charts/dubbo-admin/values.yaml | 39 +--
pkg/admin/config/config.go | 2 +-
pkg/authority/security/server.go | 1 +
.../app/authority.go => pkg/authority/setup.go | 57 +----
pkg/core/alias.go | 43 ++++
pkg/core/cmd/helpers.go | 14 ++
pkg/core/cmd/util.go | 17 ++
pkg/{authority/security => core/rule}/server.go | 3 +-
pkg/core/rule/server_test.go | 209 ++++++++++++++++
pkg/core/runtime/builder.go | 117 +++++++++
pkg/core/runtime/component/component.go | 163 ++++++++++++
pkg/core/runtime/component/leader.go | 53 ++++
pkg/core/runtime/component/resilient.go | 60 +++++
pkg/core/runtime/reports/reports.go | 277 +++++++++++++++++++++
pkg/core/runtime/runtime.go | 87 +++++++
pkg/version/version.go | 24 ++
27 files changed, 1275 insertions(+), 276 deletions(-)
diff --git a/.run/Admin.run.xml b/.run/Admin.run.xml
index 06768acd..484d9ec0 100644
--- a/.run/Admin.run.xml
+++ b/.run/Admin.run.xml
@@ -1,31 +1,14 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
--->
-
<component name="ProjectRunConfigurationManager">
- <configuration default="false" name="Admin" type="GoApplicationRunConfiguration" factoryName="Go Application">
- <module name="dubbo-admin" />
- <working_directory value="$PROJECT_DIR$" />
- <envs>
- <env name="ADMIN_CONFIG_PATH" value="conf/dubboadmin.yml" />
- </envs>
- <kind value="PACKAGE" />
- <package value="github.com/apache/dubbo-admin/cmd/admin" />
- <directory value="$PROJECT_DIR$" />
- <filePath value="$PROJECT_DIR$/cmd/admin/main.go" />
- <method v="2" />
- </configuration>
+ <configuration default="false" name="Admin" type="GoApplicationRunConfiguration" factoryName="Go Application">
+ <module name="dubbo-admin" />
+ <working_directory value="$PROJECT_DIR$" />
+ <envs>
+ <env name="ADMIN_CONFIG_PATH" value="conf/admin.yml" />
+ </envs>
+ <kind value="PACKAGE" />
+ <package value="github.com/apache/dubbo-admin/cmd/admin" />
+ <directory value="$PROJECT_DIR$" />
+ <filePath value="$PROJECT_DIR$/cmd/admin/main.go" />
+ <method v="2" />
+ </configuration>
</component>
\ No newline at end of file
diff --git a/README.md b/README.md
index c9c27373..02c97ea3 100644
--- a/README.md
+++ b/README.md
@@ -29,7 +29,7 @@ admin:
### Run with command line
```shell
-$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/dubboadmin.yml
+$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/admin.yml
$ cd cmd/admin
$ go run .
```
diff --git a/README_ZH.md b/README_ZH.md
index 617ea956..ac07ecf9 100644
--- a/README_ZH.md
+++ b/README_ZH.md
@@ -27,7 +27,7 @@ admin:
### Run with command line
```shell
-$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/dubboadmin.yml
+$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/admin.yml
$ cd cmd/admin
$ go run .
```
diff --git a/cmd/admin/README.md b/cmd/admin/README.md
index 851fb655..e4199f9a 100644
--- a/cmd/admin/README.md
+++ b/cmd/admin/README.md
@@ -24,7 +24,7 @@ admin:
### Run with command line
```shell
-$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/dubboadmin.yml
+$ export ADMIN_CONFIG_PATH=/path/to/your/admin/project/conf/admin.yml
$ cd cmd/admin
$ go run .
```
diff --git a/cmd/admin/cmd/root.go b/cmd/admin/cmd/root.go
new file mode 100644
index 00000000..58a073d5
--- /dev/null
+++ b/cmd/admin/cmd/root.go
@@ -0,0 +1,45 @@
+package cmd
+
+import (
+ "github.com/apache/dubbo-admin/pkg/logger"
+ "github.com/apache/dubbo-admin/pkg/version"
+
+ corecmd "github.com/apache/dubbo-admin/pkg/core/cmd"
+
+ "github.com/spf13/cobra"
+ "os"
+)
+
+func GetRootCmd(args []string) *cobra.Command {
+ // rootCmd represents the base command when called without any subcommands
+ cmd := &cobra.Command{
+ Use: "dubbo-admin",
+ Short: "Console and control plane for microservices built with Apache Dubbo.",
+ Long: `Console and control plane for microservices built with Apache Dubbo.`,
+
+ PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
+ logger.Init()
+
+ // once command line flags have been parsed,
+ // avoid printing usage instructions
+ cmd.SilenceUsage = true
+
+ return nil
+ },
+ }
+
+ cmd.SetOut(os.Stdout)
+
+ // root flags
+ //cmd.PersistentFlags().StringVar(&args.logLevel, "log-level", kuma_log.InfoLevel.String(), kuma_cmd.UsageOptions("log level", kuma_log.OffLevel, kuma_log.InfoLevel, kuma_log.DebugLevel))
+ //cmd.PersistentFlags().StringVar(&args.outputPath, "log-output-path", args.outputPath, "path to the file that will be filled with logs. Example: if we set it to /tmp/admin.log then after the file is rotated we will have /tmp/admin-2021-06-07T09-15-18.265.log")
+ //cmd.PersistentFlags().IntVar(&args.maxBackups, "log-max-retained-files", 1000, "maximum number of the old log files to retain")
+ //cmd.PersistentFlags().IntVar(&args.maxSize, "log-max-size", 100, "maximum size in megabytes of a log file before it gets rotated")
+ //cmd.PersistentFlags().IntVar(&args.maxAge, "log-max-age", 30, "maximum number of days to retain old log files based on the timestamp encoded in their filename")
+
+ // sub-commands
+ cmd.AddCommand(newRunCmdWithOpts(corecmd.DefaultRunCmdOpts))
+ cmd.AddCommand(version.NewVersionCmd())
+
+ return cmd
+}
diff --git a/cmd/admin/cmd/run.go b/cmd/admin/cmd/run.go
new file mode 100755
index 00000000..b12a1b6c
--- /dev/null
+++ b/cmd/admin/cmd/run.go
@@ -0,0 +1,93 @@
+package cmd
+
+import (
+ "github.com/apache/dubbo-admin/pkg/admin/config"
+ "github.com/apache/dubbo-admin/pkg/admin/constant"
+ "github.com/apache/dubbo-admin/pkg/admin/providers/mock"
+ "github.com/apache/dubbo-admin/pkg/admin/router"
+ "github.com/apache/dubbo-admin/pkg/admin/services"
+ "github.com/apache/dubbo-admin/pkg/authority"
+ "github.com/apache/dubbo-admin/pkg/core/cmd"
+ "github.com/apache/dubbo-admin/pkg/logger"
+
+ caconfig "github.com/apache/dubbo-admin/pkg/authority/config"
+
+ "os"
+ "time"
+
+ "github.com/spf13/cobra"
+)
+
+const gracefullyShutdownDuration = 3 * time.Second
+
+// This is the open file limit below which the control plane may not
+// reasonably have enough descriptors to accept all its clients.
+const minOpenFileLimit = 4096
+
+func newRunCmdWithOpts(opts cmd.RunCmdOpts) *cobra.Command {
+ args := struct {
+ configPath string
+ }{}
+ cmd := &cobra.Command{
+ Use: "run",
+ Short: "Launch Dubbo Admin",
+ Long: `Launch Dubbo Admin.`,
+ RunE: func(cmd *cobra.Command, _ []string) error {
+ // init config
+ config.LoadConfig()
+
+ // subscribe to registries
+ go services.StartSubscribe(config.RegistryCenter)
+ defer func() {
+ services.DestroySubscribe(config.RegistryCenter)
+ }()
+
+ // start mock server
+ os.Setenv(constant.ConfigFileEnvKey, config.MockProviderConf)
+ go mock.RunMockServiceServer()
+
+ // start console server
+ router := router.InitRouter()
+ if err := router.Run(":38080"); err != nil {
+ logger.Error("Failed to start Admin console server.")
+ return err
+ }
+
+ // start CA
+ if err := startCA(cmd); err != nil {
+ logger.Error("Failed to start CA server.")
+ return err
+ }
+
+ // start
+
+ return nil
+ },
+ }
+
+ // flags
+ cmd.PersistentFlags().StringVarP(&args.configPath, "config-file", "c", "", "configuration file")
+
+ return cmd
+}
+
+func startCA(cmd *cobra.Command) error {
+ options := caconfig.NewOptions()
+
+ if err := authority.Initialize(cmd); err != nil {
+ logger.Fatal("Failed to initialize CA server.")
+ return err
+ }
+
+ logger.Infof("Authority command Run with options: %+v", options)
+ if errs := options.Validate(); len(errs) != 0 {
+ logger.Fatal(errs)
+ return errs[0]
+ }
+
+ if err := authority.Run(options); err != nil {
+ logger.Fatal(err)
+ return err
+ }
+ return nil
+}
diff --git a/cmd/admin/main.go b/cmd/admin/main.go
index 5bc1ffd7..6dc3e449 100644
--- a/cmd/admin/main.go
+++ b/cmd/admin/main.go
@@ -18,30 +18,14 @@
package main
import (
+ "fmt"
+ "github.com/apache/dubbo-admin/cmd/admin/cmd"
"os"
-
- "dubbo.apache.org/dubbo-go/v3/common/constant"
- "github.com/apache/dubbo-admin/pkg/admin/config"
- mock "github.com/apache/dubbo-admin/pkg/admin/providers/mock"
- "github.com/apache/dubbo-admin/pkg/admin/router"
- "github.com/apache/dubbo-admin/pkg/admin/services"
)
-// @title Dubbo-Admin API
-// @version 1.0
-// @description This is a dubbo-admin swagger ui server.
-// @license.name Apache 2.0
-// @license.url http://www.apache.org/licenses/LICENSE-2.0.html
-// @host 127.0.0.1:38080
-// @BasePath /
func main() {
- config.LoadConfig()
- go services.StartSubscribe(config.RegistryCenter)
- defer func() {
- services.DestroySubscribe(config.RegistryCenter)
- }()
- os.Setenv(constant.ConfigFileEnvKey, config.MockProviderConf)
- go mock.RunMockServiceServer()
- router := router.InitRouter()
- _ = router.Run(":38080")
+ if err := cmd.GetRootCmd(os.Args[1:]).Execute(); err != nil {
+ fmt.Fprintf(os.Stderr, "%v\n", err)
+ os.Exit(1)
+ }
}
diff --git a/cmd/authority/app/authority.go b/cmd/authority/app/authority.go
index c49fa6e1..922cba1b 100644
--- a/cmd/authority/app/authority.go
+++ b/cmd/authority/app/authority.go
@@ -17,26 +17,10 @@ package app
import (
"flag"
- "fmt"
- "os"
- "os/signal"
- "strings"
- "syscall"
-
+ "github.com/apache/dubbo-admin/pkg/authority"
"github.com/apache/dubbo-admin/pkg/authority/config"
- "github.com/apache/dubbo-admin/pkg/authority/security"
"github.com/apache/dubbo-admin/pkg/logger"
"github.com/spf13/cobra"
- "github.com/spf13/pflag"
- "github.com/spf13/viper"
-)
-
-var (
- // For example, --webhook-port is bound to DUBBO_WEBHOOK_PORT.
- envNamePrefix = "DUBBO"
-
- // Replace hyphenated flag names with camelCase
- replaceWithCamelCase = false
)
func NewAppCommand() *cobra.Command {
@@ -45,19 +29,26 @@ func NewAppCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "authority",
Long: `The authority app is responsible for controllers in dubbo authority`,
- PersistentPreRun: func(cmd *cobra.Command, args []string) {
+ PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
logger.Infof("Authority command PersistentPreRun")
- initialize(cmd)
+ if err := authority.Initialize(cmd); err != nil {
+ logger.Fatal("Failed to initialize CA server.")
+ return err
+ }
+ return nil
},
- Run: func(cmd *cobra.Command, args []string) {
+ RunE: func(cmd *cobra.Command, args []string) error {
logger.Infof("Authority command Run with options: %+v", options)
if errs := options.Validate(); len(errs) != 0 {
logger.Fatal(errs)
+ return errs[0]
}
- if err := Run(options); err != nil {
+ if err := authority.Run(options); err != nil {
logger.Fatal(err)
+ return err
}
+ return nil
},
}
@@ -65,54 +56,3 @@ func NewAppCommand() *cobra.Command {
options.FillFlags(cmd.Flags())
return cmd
}
-
-func Run(options *config.Options) error {
- s := security.NewServer(options)
-
- s.Init()
- s.Start()
-
- c := make(chan os.Signal, 1)
- signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
- signal.Notify(s.StopChan, syscall.SIGINT, syscall.SIGTERM)
- signal.Notify(s.CertStorage.GetStopChan(), syscall.SIGINT, syscall.SIGTERM)
-
- <-c
-
- return nil
-}
-
-func initialize(cmd *cobra.Command) error {
- v := viper.New()
-
- // For example, --webhook-port is bound to DUBBO_WEBHOOK_PORT.
- v.SetEnvPrefix(envNamePrefix)
-
- // keys with underscores, e.g. DUBBO-WEBHOOK-PORT to DUBBO_WEBHOOK_PORT
- v.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
-
- // Bind to environment variables
- v.AutomaticEnv()
-
- // Bind the current command's flags to viper
- bindFlags(cmd, v)
-
- return nil
-}
-
-func bindFlags(cmd *cobra.Command, v *viper.Viper) {
- cmd.Flags().VisitAll(func(f *pflag.Flag) {
- configName := f.Name
-
- // Replace hyphens with a camelCased string.
- if replaceWithCamelCase {
- configName = strings.ReplaceAll(f.Name, "-", "")
- }
-
- // Apply the viper config value to the flag when the flag is not set and viper has a value
- if !f.Changed && v.IsSet(configName) {
- val := v.Get(configName)
- cmd.Flags().Set(f.Name, fmt.Sprintf("%v", val))
- }
- })
-}
diff --git a/conf/dubboadmin.yml b/conf/admin.yml
similarity index 100%
rename from conf/dubboadmin.yml
rename to conf/admin.yml
diff --git a/deploy/charts/dubbo-admin/templates/configmap.yaml b/deploy/charts/dubbo-admin/templates/configmap.yaml
index 48616278..cb36a05b 100644
--- a/deploy/charts/dubbo-admin/templates/configmap.yaml
+++ b/deploy/charts/dubbo-admin/templates/configmap.yaml
@@ -13,16 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-{{- $defaultsessionTimeoutMilli := 3600000 -}}
-{{- $defaulttokenTimeoutMilli := 3600000 -}}
-{{- $defaultsignSecret := "86295dd0c4ef69a1036b0b0c15158d77" -}}
-{{- $defaulttoken := "e16e5cd903fd0c97a116c873b448544b9d086de9" -}}
-{{- $defaultname := "dubbo-admin" -}}
-{{- $defaultdriverclassname := "com.mysql.jdbc.Driver" -}}
-{{- $apollotoken := (coalesce .Values.apollo.token $defaulttoken) -}}
-{{- $dubboname := (coalesce .Values.dubbo.application.name $defaultname) -}}
-{{- $springdriverclassname := (coalesce .Values.spring.datasource.driverclassname $defaultdriverclassname) -}}
----
apiVersion: v1
kind: ConfigMap
metadata:
@@ -34,71 +24,10 @@ metadata:
{{- toYaml . | nindent 4 }}
{{- end }}
data:
- application.properties: |-
- {{- with .Values.admin }}
- {{- if .zookeeper.enabled }}
- admin.registry.address: {{ .zookeeper.address }}
- admin.metadata-report.address: {{ .report.zookeeper.address }}
- admin.config-center: {{ .zookeeper.center }}
- {{- end }}
- {{- if .nacos.enabled }}
- admin.registry.address: {{ .nacos.address }}
- admin.registry.group: {{ .nacos.group }}
- admin.registry.namespace: {{ .nacos.namespace }}
- admin.config-center: {{ .nacos.center }}
- admin.config-center.group: {{ .nacos.group }}
- admin.config-center.namespace: {{ .nacos.namespace }}
- admin.metadata-report.address: {{ .report.nacos.address }}
- admin.metadata-report.group: {{ .report.nacos.group }}
- admin.metadata-report.namespace: {{ .report.nacos.namespace }}
- {{- end }}
- {{- end }}
- {{- with .Values.root.user }}
- admin.root.user.name: {{ .name }}
- admin.root.user.password: {{ .password }}
- {{- end }}
- {{- if .Values.check.enabled }}
- {{- if $defaultsessionTimeoutMilli }}
- admin.check.sessionTimeoutMilli: {{ $defaultsessionTimeoutMilli }}
- {{- end }}
- {{- if $defaulttokenTimeoutMilli }}
- admin.check.tokenTimeoutMilli: {{ $defaulttokenTimeoutMilli }}
- {{- end }}
- {{- if $defaultsignSecret }}
- admin.check.signSecret: {{ $defaultsignSecret }}
- {{- end }}
- {{- end }}
- {{- with .Values.apollo }}
- {{- if .enabled }}
- {{- if $apollotoken }}
- admin.apollo.token: {{ $apollotoken }}
- {{- end }}
- admin.apollo.appId: {{ .appid }}
- admin.apollo.env: {{ .env }}
- admin.apollo.cluster: {{ .cluster }}
- admin.config-center: {{ .center }}
- {{- end }}
- {{- end }}
- {{- with .Values.server.compression }}
- server.compression.enabled: {{ .enabled }}
- server.compression.mime-types: {{ .types }}
- server.compression.min-response-size: {{ .size }}
- {{- end }}
- {{- with .Values.dubbo }}
- {{- if $dubboname }}
- dubbo.application.name: {{ $dubboname }}
- {{- end }}
- dubbo.application.logger: {{ .application.logger }}
- dubbo.registry.address: {{ .registry.address }}
- {{- end }}
- {{- with .Values.spring }}
- {{- if .datasource.enabled }}
- spring.datasource.driver-class-name: {{ $springdriverclassname }}
- spring.datasource.url: {{ .datasource.url }}
- spring.datasource.username: {{ .datasource.username }}
- spring.datasource.password: {{ .datasource.password }}
- {{- end }}
- {{- end }}
- {{- with .Values.mybatis }}
- mybatis-plus.global-config.db-config.id-type: {{ .type }}
- {{- end }}
\ No newline at end of file
+ # use this file to override default configuration of `dubbo-admin`
+ #
+ # see conf/admin.yml for available settings
+ admin.yml: |-
+ {{ if .Values.admin }}
+ {{ toYaml .Values.admin | trim | nindent 4 }}
+ {{ end }}
\ No newline at end of file
diff --git a/deploy/charts/dubbo-admin/templates/deployment.yaml b/deploy/charts/dubbo-admin/templates/deployment.yaml
index 2379156e..2e05c8d9 100644
--- a/deploy/charts/dubbo-admin/templates/deployment.yaml
+++ b/deploy/charts/dubbo-admin/templates/deployment.yaml
@@ -113,6 +113,9 @@ spec:
{{- end }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
+ env:
+ - name: ADMIN_CONFIG_PATH
+ value: /config/admin.yml
volumes:
- name: application-properties
configMap:
diff --git a/deploy/charts/dubbo-admin/values.yaml b/deploy/charts/dubbo-admin/values.yaml
index 4fa0565c..8ef5f252 100644
--- a/deploy/charts/dubbo-admin/values.yaml
+++ b/deploy/charts/dubbo-admin/values.yaml
@@ -61,26 +61,24 @@ serviceAccount:
imagePullSecrets: []
# - name: secretName
+## admin configuration
+admin:
+ ## dubbo configuration when admin works as a provider
+ dubbo:
+ ## dubbo enabled
+ enabled: true
-## @param Admin Default Enable Configuration
-dubbo:
- ## dubbo enabled
- enabled: true
-
- ## dubbo application
- application:
- ## dubbo application name
- name: ~
-
- ## dubbo application logger
- logger: slf4j
+ ## dubbo application
+ application:
+ ## dubbo application name
+ name: ~
- registry:
- ## dubbo registry address
- address: ${admin.registry.address}
+ ## dubbo application logger
+ logger: slf4j
-## admin configuration
-admin:
+ registry:
+ ## dubbo registry address
+ address: ${admin.registry.address}
## zookeeper configuration
zookeeper:
## zookeeper enabled
@@ -123,8 +121,11 @@ admin:
## nacos namespace
namespace: public
-
-
+ prometheus:
+ address: prometheus.dubbo-system.svc.cluster.local:3000
+ grafana:
+ address: grafana.dubbo-system.svc.cluster.local
+ mysql-dsn: "root:password@tcp(127.0.0.1:3306)/dubbo-admin?charset=utf8&parseTime=true"
## apollo configuration
apollo:
diff --git a/pkg/admin/config/config.go b/pkg/admin/config/config.go
index 847b4a12..460e59d1 100644
--- a/pkg/admin/config/config.go
+++ b/pkg/admin/config/config.go
@@ -41,7 +41,7 @@ import (
)
const (
- conf = "./conf/dubboadmin.yml"
+ conf = "./conf/admin.yml"
confPathKey = "ADMIN_CONFIG_PATH"
)
diff --git a/pkg/authority/security/server.go b/pkg/authority/security/server.go
index 060f9849..34f6d10c 100644
--- a/pkg/authority/security/server.go
+++ b/pkg/authority/security/server.go
@@ -119,6 +119,7 @@ func (s *Server) Init() {
s.registerCertificateService()
s.registerObserveService()
+ s.registerTrafficService()
reflection.Register(s.SecureServer)
diff --git a/cmd/authority/app/authority.go b/pkg/authority/setup.go
similarity index 50%
copy from cmd/authority/app/authority.go
copy to pkg/authority/setup.go
index c49fa6e1..50377b55 100644
--- a/cmd/authority/app/authority.go
+++ b/pkg/authority/setup.go
@@ -1,34 +1,16 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package app
+package authority
import (
- "flag"
"fmt"
- "os"
- "os/signal"
- "strings"
- "syscall"
-
"github.com/apache/dubbo-admin/pkg/authority/config"
"github.com/apache/dubbo-admin/pkg/authority/security"
- "github.com/apache/dubbo-admin/pkg/logger"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
+ "os"
+ "os/signal"
+ "strings"
+ "syscall"
)
var (
@@ -39,33 +21,6 @@ var (
replaceWithCamelCase = false
)
-func NewAppCommand() *cobra.Command {
- options := config.NewOptions()
-
- cmd := &cobra.Command{
- Use: "authority",
- Long: `The authority app is responsible for controllers in dubbo authority`,
- PersistentPreRun: func(cmd *cobra.Command, args []string) {
- logger.Infof("Authority command PersistentPreRun")
- initialize(cmd)
- },
- Run: func(cmd *cobra.Command, args []string) {
- logger.Infof("Authority command Run with options: %+v", options)
- if errs := options.Validate(); len(errs) != 0 {
- logger.Fatal(errs)
- }
-
- if err := Run(options); err != nil {
- logger.Fatal(err)
- }
- },
- }
-
- cmd.Flags().AddGoFlagSet(flag.CommandLine)
- options.FillFlags(cmd.Flags())
- return cmd
-}
-
func Run(options *config.Options) error {
s := security.NewServer(options)
@@ -82,7 +37,7 @@ func Run(options *config.Options) error {
return nil
}
-func initialize(cmd *cobra.Command) error {
+func Initialize(cmd *cobra.Command) error {
v := viper.New()
// For example, --webhook-port is bound to DUBBO_WEBHOOK_PORT.
diff --git a/pkg/core/alias.go b/pkg/core/alias.go
new file mode 100755
index 00000000..426b67f9
--- /dev/null
+++ b/pkg/core/alias.go
@@ -0,0 +1,43 @@
+package core
+
+import (
+ "context"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "github.com/google/uuid"
+ kube_log "sigs.k8s.io/controller-runtime/pkg/log"
+)
+
+var (
+ // TODO remove dependency on kubernetes see: https://github.com/kumahq/kuma/issues/2798
+ Log = kube_log.Log
+ SetLogger = kube_log.SetLogger
+ Now = time.Now
+ TempDir = os.TempDir
+
+ SetupSignalHandler = func() (context.Context, context.Context) {
+ gracefulCtx, gracefulCancel := context.WithCancel(context.Background())
+ ctx, cancel := context.WithCancel(context.Background())
+ c := make(chan os.Signal, 3)
+ signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
+ go func() {
+ s := <-c
+ Log.Info("Received signal, stopping instance gracefully", "signal", s.String())
+ gracefulCancel()
+ s = <-c
+ Log.Info("Received second signal, stopping instance", "signal", s.String())
+ cancel()
+ s = <-c
+ Log.Info("Received third signal, force exit", "signal", s.String())
+ os.Exit(1)
+ }()
+ return gracefulCtx, ctx
+ }
+)
+
+func NewUUID() string {
+ return uuid.NewString()
+}
diff --git a/pkg/core/cmd/helpers.go b/pkg/core/cmd/helpers.go
new file mode 100755
index 00000000..d6ff1170
--- /dev/null
+++ b/pkg/core/cmd/helpers.go
@@ -0,0 +1,14 @@
+package cmd
+
+import (
+ "fmt"
+ "strings"
+)
+
+func UsageOptions(desc string, options ...interface{}) string {
+ values := make([]string, 0, len(options))
+ for _, option := range options {
+ values = append(values, fmt.Sprintf("%v", option))
+ }
+ return fmt.Sprintf("%s: one of %s", desc, strings.Join(values, "|"))
+}
diff --git a/pkg/core/cmd/util.go b/pkg/core/cmd/util.go
new file mode 100755
index 00000000..7b79464d
--- /dev/null
+++ b/pkg/core/cmd/util.go
@@ -0,0 +1,17 @@
+package cmd
+
+import (
+ "context"
+ "github.com/apache/dubbo-admin/pkg/core"
+)
+
+type RunCmdOpts struct {
+ // The first returned context is closed upon receiving first signal (SIGSTOP, SIGTERM).
+ // The second returned context is closed upon receiving second signal.
+ // We can start graceful shutdown when first context is closed and forcefully stop when the second one is closed.
+ SetupSignalHandler func() (context.Context, context.Context)
+}
+
+var DefaultRunCmdOpts = RunCmdOpts{
+ SetupSignalHandler: core.SetupSignalHandler,
+}
diff --git a/pkg/authority/security/server.go b/pkg/core/rule/server.go
similarity index 99%
copy from pkg/authority/security/server.go
copy to pkg/core/rule/server.go
index 060f9849..3c3d8210 100644
--- a/pkg/authority/security/server.go
+++ b/pkg/core/rule/server.go
@@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package security
+package rule
import (
"crypto/tls"
@@ -119,6 +119,7 @@ func (s *Server) Init() {
s.registerCertificateService()
s.registerObserveService()
+ s.registerTrafficService()
reflection.Register(s.SecureServer)
diff --git a/pkg/core/rule/server_test.go b/pkg/core/rule/server_test.go
new file mode 100644
index 00000000..b52bec83
--- /dev/null
+++ b/pkg/core/rule/server_test.go
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rule
+
+import (
+ "crypto/tls"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/apache/dubbo-admin/pkg/authority/election"
+
+ "k8s.io/client-go/kubernetes"
+
+ cert2 "github.com/apache/dubbo-admin/pkg/authority/cert"
+ "github.com/apache/dubbo-admin/pkg/authority/config"
+ "github.com/apache/dubbo-admin/pkg/authority/k8s"
+ "github.com/apache/dubbo-admin/pkg/logger"
+)
+
+type mockKubeClient struct {
+ k8s.Client
+}
+
+var (
+ certPEM = ""
+ priPEM = ""
+)
+
+func (s *mockKubeClient) Init(options *config.Options) bool {
+ return true
+}
+
+func (s *mockKubeClient) GetAuthorityCert(namespace string) (string, string) {
+ return certPEM, priPEM
+}
+
+func (s *mockKubeClient) UpdateAuthorityCert(cert string, pri string, namespace string) {
+}
+
+func (s *mockKubeClient) UpdateAuthorityPublicKey(cert string) bool {
+ return true
+}
+
+func (s *mockKubeClient) UpdateWebhookConfig(options *config.Options, storage cert2.Storage) {
+}
+
+func (s *mockKubeClient) GetKubClient() *kubernetes.Clientset {
+ return nil
+}
+
+type mockStorage struct {
+ cert2.Storage
+ origin cert2.Storage
+}
+
+func (s *mockStorage) GetServerCert(serverName string) *tls.Certificate {
+ return nil
+}
+
+func (s *mockStorage) RefreshServerCert() {
+}
+
+func (s *mockStorage) SetAuthorityCert(cert *cert2.Cert) {
+ s.origin.SetAuthorityCert(cert)
+}
+
+func (s *mockStorage) GetAuthorityCert() *cert2.Cert {
+ return s.origin.GetAuthorityCert()
+}
+
+func (s *mockStorage) SetRootCert(cert *cert2.Cert) {
+ s.origin.SetRootCert(cert)
+}
+
+func (s *mockStorage) GetRootCert() *cert2.Cert {
+ return s.origin.GetRootCert()
+}
+
+func (s *mockStorage) AddTrustedCert(cert *cert2.Cert) {
+ s.origin.AddTrustedCert(cert)
+}
+
+func (s *mockStorage) GetTrustedCerts() []*cert2.Cert {
+ return s.origin.GetTrustedCerts()
+}
+
+func (s *mockStorage) GetStopChan() chan os.Signal {
+ return s.origin.GetStopChan()
+}
+
+type mockLeaderElection struct {
+ election.LeaderElection
+}
+
+func (s *mockLeaderElection) Election(storage cert2.Storage, options *config.Options, kubeClient *kubernetes.Clientset) error {
+ storage.SetAuthorityCert(cert2.GenerateAuthorityCert(storage.GetRootCert(), options.CaValidity))
+ return nil
+}
+
+func TestInit(t *testing.T) {
+ t.Parallel()
+
+ logger.Init()
+
+ options := &config.Options{
+ IsKubernetesConnected: true,
+ Namespace: "dubbo-system",
+ PlainServerPort: 30060,
+ SecureServerPort: 30062,
+ DebugPort: 30070,
+ CaValidity: 30 * 24 * 60 * 60 * 1000, // 30 day
+ CertValidity: 1 * 60 * 60 * 1000, // 1 hour
+ }
+
+ s := NewServer(options)
+ s.KubeClient = &mockKubeClient{}
+
+ s.Init()
+ if !s.CertStorage.GetAuthorityCert().IsValid() {
+ t.Fatal("Authority cert is not valid")
+ return
+ }
+
+ certPEM = s.CertStorage.GetAuthorityCert().CertPem
+ priPEM = cert2.EncodePrivateKey(s.CertStorage.GetAuthorityCert().PrivateKey)
+
+ s.PlainServer.Stop()
+ s.SecureServer.Stop()
+ s.StopChan <- os.Kill
+
+ s = NewServer(options)
+ s.KubeClient = &mockKubeClient{}
+ s.Init()
+
+ if !s.CertStorage.GetAuthorityCert().IsValid() {
+ t.Fatal("Authority cert is not valid")
+
+ return
+ }
+
+ if s.CertStorage.GetAuthorityCert().CertPem != certPEM {
+ t.Fatal("Authority cert is not equal")
+
+ return
+ }
+
+ s.PlainServer.Stop()
+ s.SecureServer.Stop()
+ s.StopChan <- os.Kill
+ s.CertStorage.GetStopChan() <- os.Kill
+}
+
+func TestRefresh(t *testing.T) {
+ t.Parallel()
+
+ logger.Init()
+
+ options := &config.Options{
+ IsKubernetesConnected: false,
+ Namespace: "dubbo-system",
+ PlainServerPort: 30060,
+ SecureServerPort: 30062,
+ DebugPort: 30070,
+ CaValidity: 10,
+ }
+
+ s := NewServer(options)
+
+ s.KubeClient = &mockKubeClient{}
+ storage := &mockStorage{}
+ s.Elec = &mockLeaderElection{}
+ storage.origin = cert2.NewStorage(options)
+ s.CertStorage = storage
+
+ s.Init()
+
+ origin := s.CertStorage.GetAuthorityCert()
+
+ for i := 0; i < 1000; i++ {
+ // wait at most 100s
+ time.Sleep(100 * time.Millisecond)
+ if s.CertStorage.GetAuthorityCert() != origin {
+ break
+ }
+ }
+
+ if s.CertStorage.GetAuthorityCert() == origin {
+ t.Fatal("Authority cert is not refreshed")
+ return
+ }
+
+ s.PlainServer.Stop()
+ s.SecureServer.Stop()
+ s.StopChan <- os.Kill
+}
diff --git a/pkg/core/runtime/builder.go b/pkg/core/runtime/builder.go
new file mode 100755
index 00000000..43477c87
--- /dev/null
+++ b/pkg/core/runtime/builder.go
@@ -0,0 +1,117 @@
+package runtime
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/dubbo-admin/pkg/core"
+ "github.com/apache/dubbo-admin/pkg/core/runtime/component"
+ "os"
+ "time"
+
+ "github.com/pkg/errors"
+)
+
+// BuilderContext provides access to Builder's interim state.
+type BuilderContext interface {
+ ComponentManager() component.Manager
+ ResourceStore() core_store.ResourceStore
+ SecretStore() store.SecretStore
+ ConfigStore() core_store.ResourceStore
+ ResourceManager() core_manager.CustomizableResourceManager
+ Config() kuma_cp.Config
+ DataSourceLoader() datasource.Loader
+ Extensions() context.Context
+ ConfigManager() config_manager.ConfigManager
+ LeaderInfo() component.LeaderInfo
+ Metrics() metrics.Metrics
+ EventReaderFactory() events.ListenerFactory
+ APIManager() api_server.APIManager
+ XDSHooks() *xds_hooks.Hooks
+ CAProvider() secrets.CaProvider
+ DpServer() *dp_server.DpServer
+ ResourceValidators() ResourceValidators
+ KDSContext() *kds_context.Context
+ APIServerAuthenticator() authn.Authenticator
+ Access() Access
+ TokenIssuers() builtin.TokenIssuers
+ MeshCache() *mesh.Cache
+ InterCPClientPool() *client.Pool
+}
+
+var _ BuilderContext = &Builder{}
+
+// Builder represents a multi-step initialization process.
+type Builder struct {
+ cfg kuma_cp.Config
+ cm component.Manager
+ rs core_store.ResourceStore
+ *runtimeInfo
+}
+
+func BuilderFor(appCtx context.Context, cfg kuma_cp.Config) (*Builder, error) {
+ hostname, err := os.Hostname()
+ if err != nil {
+ return nil, errors.Wrap(err, "could not get hostname")
+ }
+ suffix := core.NewUUID()[0:4]
+ return &Builder{
+ cfg: cfg,
+ ext: context.Background(),
+ cam: core_ca.Managers{},
+ runtimeInfo: &runtimeInfo{
+ instanceId: fmt.Sprintf("%s-%s", hostname, suffix),
+ startTime: time.Now(),
+ },
+ appCtx: appCtx,
+ }, nil
+}
+
+func (b *Builder) WithComponentManager(cm component.Manager) *Builder {
+ b.cm = cm
+ return b
+}
+
+func (b *Builder) Build() (Runtime, error) {
+ if b.cm == nil {
+ return nil, errors.Errorf("ComponentManager has not been configured")
+ }
+ return &runtime{
+ RuntimeInfo: b.runtimeInfo,
+ RuntimeContext: &runtimeContext{
+ cfg: b.cfg,
+ rm: b.rm,
+ rom: b.rom,
+ rs: b.rs,
+ ss: b.ss,
+ cam: b.cam,
+ dsl: b.dsl,
+ ext: b.ext,
+ configm: b.configm,
+ leadInfo: b.leadInfo,
+ lif: b.lif,
+ eac: b.eac,
+ metrics: b.metrics,
+ erf: b.erf,
+ apim: b.apim,
+ xdsauth: b.xdsauth,
+ xdsCallbacks: b.xdsCallbacks,
+ xdsh: b.xdsh,
+ cap: b.cap,
+ dps: b.dps,
+ kdsctx: b.kdsctx,
+ rv: b.rv,
+ au: b.au,
+ acc: b.acc,
+ appCtx: b.appCtx,
+ extraReportsFn: b.extraReportsFn,
+ tokenIssuers: b.tokenIssuers,
+ meshCache: b.meshCache,
+ interCpPool: b.interCpPool,
+ },
+ Manager: b.cm,
+ }, nil
+}
+
+func (b *Builder) ComponentManager() component.Manager {
+ return b.cm
+}
diff --git a/pkg/core/runtime/component/component.go b/pkg/core/runtime/component/component.go
new file mode 100755
index 00000000..d3f6a21b
--- /dev/null
+++ b/pkg/core/runtime/component/component.go
@@ -0,0 +1,163 @@
+package component
+
+import (
+ "sync"
+
+ "github.com/kumahq/kuma/pkg/core"
+ "github.com/kumahq/kuma/pkg/util/channels"
+)
+
+var log = core.Log.WithName("bootstrap")
+
+// Component defines a process that will be run in the application
+// Component should be designed in such a way that it can be stopped by stop channel and started again (for example when instance is reelected for a leader).
+type Component interface {
+ // Start blocks until the channel is closed or an error occurs.
+ // The component will stop running when the channel is closed.
+ Start(<-chan struct{}) error
+
+ // NeedLeaderElection indicates if component should be run only by one instance of Control Plane even with many Control Plane replicas.
+ NeedLeaderElection() bool
+}
+
+// GracefulComponent is a component that supports waiting until it's finished.
+// It's useful if there is cleanup logic that has to be executed before the process exits
+// (i.e. sending SIGTERM signals to subprocesses started by this component).
+type GracefulComponent interface {
+ Component
+
+ // WaitForDone blocks until all components are done.
+ // If a component was not started (i.e. leader components on non-leader CP) it returns immediately.
+ WaitForDone()
+}
+
+// Component of Kuma, i.e. gRPC Server, HTTP server, reconciliation loop.
+var _ Component = ComponentFunc(nil)
+
+type ComponentFunc func(<-chan struct{}) error
+
+func (f ComponentFunc) NeedLeaderElection() bool {
+ return false
+}
+
+func (f ComponentFunc) Start(stop <-chan struct{}) error {
+ return f(stop)
+}
+
+var _ Component = LeaderComponentFunc(nil)
+
+type LeaderComponentFunc func(<-chan struct{}) error
+
+func (f LeaderComponentFunc) NeedLeaderElection() bool {
+ return true
+}
+
+func (f LeaderComponentFunc) Start(stop <-chan struct{}) error {
+ return f(stop)
+}
+
+type Manager interface {
+
+ // Add registers a component, i.e. gRPC Server, HTTP server, reconciliation loop.
+ Add(...Component) error
+
+ // Start starts registered components and blocks until the Stop channel is closed.
+ // Returns an error if there is an error starting any component.
+ // If there are any GracefulComponent, it waits until all components are done.
+ Start(<-chan struct{}) error
+}
+
+var _ Manager = &manager{}
+
+func NewManager(leaderElector LeaderElector) Manager {
+ return &manager{
+ leaderElector: leaderElector,
+ }
+}
+
+type manager struct {
+ components []Component
+ leaderElector LeaderElector
+}
+
+func (cm *manager) Add(c ...Component) error {
+ cm.components = append(cm.components, c...)
+ return nil
+}
+
+func (cm *manager) waitForDone() {
+ for _, c := range cm.components {
+ if gc, ok := c.(GracefulComponent); ok {
+ gc.WaitForDone()
+ }
+ }
+}
+
+func (cm *manager) Start(stop <-chan struct{}) error {
+ errCh := make(chan error)
+
+ cm.startNonLeaderComponents(stop, errCh)
+ cm.startLeaderComponents(stop, errCh)
+
+ defer cm.waitForDone()
+ select {
+ case <-stop:
+ return nil
+ case err := <-errCh:
+ return err
+ }
+}
+
+func (cm *manager) startNonLeaderComponents(stop <-chan struct{}, errCh chan error) {
+ for _, component := range cm.components {
+ if !component.NeedLeaderElection() {
+ go func(c Component) {
+ if err := c.Start(stop); err != nil {
+ errCh <- err
+ }
+ }(component)
+ }
+ }
+}
+
+func (cm *manager) startLeaderComponents(stop <-chan struct{}, errCh chan error) {
+ // leader stop channel needs to be stored in atomic because it will be written by leader elector goroutine
+ // and read by the last goroutine in this function.
+ // we need separate channel for leader components because they can be restarted
+ mutex := sync.Mutex{}
+ leaderStopCh := make(chan struct{})
+ closeLeaderCh := func() {
+ mutex.Lock()
+ defer mutex.Unlock()
+ if !channels.IsClosed(leaderStopCh) {
+ close(leaderStopCh)
+ }
+ }
+
+ cm.leaderElector.AddCallbacks(LeaderCallbacks{
+ OnStartedLeading: func() {
+ log.Info("leader acquired")
+ mutex.Lock()
+ defer mutex.Unlock()
+ leaderStopCh = make(chan struct{})
+ for _, component := range cm.components {
+ if component.NeedLeaderElection() {
+ go func(c Component) {
+ if err := c.Start(leaderStopCh); err != nil {
+ errCh <- err
+ }
+ }(component)
+ }
+ }
+ },
+ OnStoppedLeading: func() {
+ log.Info("leader lost")
+ closeLeaderCh()
+ },
+ })
+ go cm.leaderElector.Start(stop)
+ go func() {
+ <-stop
+ closeLeaderCh()
+ }()
+}
diff --git a/pkg/core/runtime/component/leader.go b/pkg/core/runtime/component/leader.go
new file mode 100755
index 00000000..3edbc4e9
--- /dev/null
+++ b/pkg/core/runtime/component/leader.go
@@ -0,0 +1,53 @@
+package component
+
+import "sync/atomic"
+
+// LeaderCallbacks defines callbacks for events from LeaderElector
+// It is guaranteed that each methods will be executed from the same goroutine, so only one method can be run at once.
+type LeaderCallbacks struct {
+ OnStartedLeading func()
+ OnStoppedLeading func()
+}
+
+type LeaderElector interface {
+ AddCallbacks(LeaderCallbacks)
+ // IsLeader should be used for diagnostic reasons (metrics/API info), because there may not be any leader elector for a short period of time.
+ // Use Callbacks to write logic to execute when Leader is elected.
+ IsLeader() bool
+
+ // Start blocks until the channel is closed or an error occurs.
+ Start(stop <-chan struct{})
+}
+
+type LeaderInfo interface {
+ IsLeader() bool
+}
+
+var _ LeaderInfo = &LeaderInfoComponent{}
+var _ Component = &LeaderInfoComponent{}
+
+type LeaderInfoComponent struct {
+ leader int32
+}
+
+func (l *LeaderInfoComponent) Start(stop <-chan struct{}) error {
+ l.setLeader(true)
+ <-stop
+ l.setLeader(false)
+ return nil
+}
+
+func (l *LeaderInfoComponent) NeedLeaderElection() bool {
+ return true
+}
+
+func (p *LeaderInfoComponent) setLeader(leader bool) {
+ var value int32 = 0
+ if leader {
+ value = 1
+ }
+ atomic.StoreInt32(&p.leader, value)
+}
+func (p *LeaderInfoComponent) IsLeader() bool {
+ return atomic.LoadInt32(&(p.leader)) == 1
+}
diff --git a/pkg/core/runtime/component/resilient.go b/pkg/core/runtime/component/resilient.go
new file mode 100755
index 00000000..8167f519
--- /dev/null
+++ b/pkg/core/runtime/component/resilient.go
@@ -0,0 +1,60 @@
+package component
+
+import (
+ "time"
+
+ "github.com/go-logr/logr"
+ "github.com/pkg/errors"
+)
+
+const (
+ backoffTime = 5 * time.Second
+)
+
+type resilientComponent struct {
+ log logr.Logger
+ component Component
+}
+
+func NewResilientComponent(log logr.Logger, component Component) Component {
+ return &resilientComponent{
+ log: log,
+ component: component,
+ }
+}
+
+func (r *resilientComponent) Start(stop <-chan struct{}) error {
+ r.log.Info("starting resilient component ...")
+ for generationID := uint64(1); ; generationID++ {
+ errCh := make(chan error, 1)
+ go func(errCh chan<- error) {
+ defer close(errCh)
+ // recover from a panic
+ defer func() {
+ if e := recover(); e != nil {
+ if err, ok := e.(error); ok {
+ errCh <- err
+ } else {
+ errCh <- errors.Errorf("%v", e)
+ }
+ }
+ }()
+
+ errCh <- r.component.Start(stop)
+ }(errCh)
+ select {
+ case <-stop:
+ r.log.Info("done")
+ return nil
+ case err := <-errCh:
+ if err != nil {
+ r.log.WithValues("generationID", generationID).Error(err, "component terminated with an error")
+ }
+ }
+ <-time.After(backoffTime)
+ }
+}
+
+func (r *resilientComponent) NeedLeaderElection() bool {
+ return r.component.NeedLeaderElection()
+}
diff --git a/pkg/core/runtime/reports/reports.go b/pkg/core/runtime/reports/reports.go
new file mode 100755
index 00000000..29219a1e
--- /dev/null
+++ b/pkg/core/runtime/reports/reports.go
@@ -0,0 +1,277 @@
+package reports
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "net"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/pkg/errors"
+
+ mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
+ kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"
+ config_core "github.com/kumahq/kuma/pkg/config/core"
+ "github.com/kumahq/kuma/pkg/core"
+ "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
+ "github.com/kumahq/kuma/pkg/core/resources/apis/system"
+ "github.com/kumahq/kuma/pkg/core/resources/registry"
+ core_runtime "github.com/kumahq/kuma/pkg/core/runtime"
+ "github.com/kumahq/kuma/pkg/core/user"
+ kuma_version "github.com/kumahq/kuma/pkg/version"
+)
+
+const (
+ pingInterval = 3600
+ pingHost = "kong-hf.konghq.com"
+ pingPort = 61832
+)
+
+var (
+ log = core.Log.WithName("core").WithName("reports")
+)
+
+/*
+ - buffer initialized upon Init call
+ - append adds more keys onto it
+*/
+
+type reportsBuffer struct {
+ sync.Mutex
+ mutable map[string]string
+ immutable map[string]string
+}
+
+func fetchDataplanes(ctx context.Context, rt core_runtime.Runtime) (*mesh.DataplaneResourceList, error) {
+ dataplanes := mesh.DataplaneResourceList{}
+ if err := rt.ReadOnlyResourceManager().List(ctx, &dataplanes); err != nil {
+ return nil, errors.Wrap(err, "could not fetch dataplanes")
+ }
+
+ return &dataplanes, nil
+}
+
+func fetchMeshes(ctx context.Context, rt core_runtime.Runtime) (*mesh.MeshResourceList, error) {
+ meshes := mesh.MeshResourceList{}
+ if err := rt.ReadOnlyResourceManager().List(ctx, &meshes); err != nil {
+ return nil, errors.Wrap(err, "could not fetch meshes")
+ }
+
+ return &meshes, nil
+}
+
+func fetchZones(ctx context.Context, rt core_runtime.Runtime) (*system.ZoneResourceList, error) {
+ zones := system.ZoneResourceList{}
+ if err := rt.ReadOnlyResourceManager().List(ctx, &zones); err != nil {
+ return nil, errors.Wrap(err, "could not fetch zones")
+ }
+ return &zones, nil
+}
+
+func fetchNumPolicies(ctx context.Context, rt core_runtime.Runtime) (map[string]string, error) {
+ policyCounts := map[string]string{}
+
+ for _, descr := range registry.Global().ObjectDescriptors() {
+ typedList := descr.NewList()
+ k := "n_" + strings.ToLower(string(descr.Name))
+ if err := rt.ReadOnlyResourceManager().List(ctx, typedList); err != nil {
+ return nil, errors.Wrap(err, fmt.Sprintf("could not fetch %s", k))
+ }
+ policyCounts[k] = strconv.Itoa(len(typedList.GetItems()))
+ }
+ return policyCounts, nil
+}
+
+func fetchNumOfServices(ctx context.Context, rt core_runtime.Runtime) (int, int, error) {
+ insights := mesh.ServiceInsightResourceList{}
+ if err := rt.ReadOnlyResourceManager().List(ctx, &insights); err != nil {
+ return 0, 0, errors.Wrap(err, "could not fetch service insights")
+ }
+ internalServices := 0
+ for _, insight := range insights.Items {
+ internalServices += len(insight.Spec.Services)
+ }
+
+ externalServicesList := mesh.ExternalServiceResourceList{}
+ if err := rt.ReadOnlyResourceManager().List(ctx, &externalServicesList); err != nil {
+ return 0, 0, errors.Wrap(err, "could not fetch external services")
+ }
+ return internalServices, len(externalServicesList.Items), nil
+}
+
+func (b *reportsBuffer) marshall() (string, error) {
+ var builder strings.Builder
+
+ _, err := fmt.Fprintf(&builder, "<14>")
+ if err != nil {
+ return "", err
+ }
+
+ for k, v := range b.immutable {
+ _, err := fmt.Fprintf(&builder, "%s=%s;", k, v)
+ if err != nil {
+ return "", err
+ }
+ }
+
+ for k, v := range b.mutable {
+ _, err := fmt.Fprintf(&builder, "%s=%s;", k, v)
+ if err != nil {
+ return "", err
+ }
+ }
+
+ return builder.String(), nil
+}
+
+// XXX this function retrieves all dataplanes and all meshes;
+// ideally, the number of dataplanes and number of meshes
+// should be pushed from the outside rather than pulled
+func (b *reportsBuffer) updateEntitiesReport(rt core_runtime.Runtime) error {
+ ctx := user.Ctx(context.Background(), user.ControlPlane)
+ dps, err := fetchDataplanes(ctx, rt)
+ if err != nil {
+ return err
+ }
+ b.mutable["dps_total"] = strconv.Itoa(len(dps.Items))
+
+ ngateways := 0
+ gatewayTypes := map[string]int{}
+ for _, dp := range dps.Items {
+ spec := dp.GetSpec().(*mesh_proto.Dataplane)
+ gateway := spec.GetNetworking().GetGateway()
+ if gateway != nil {
+ ngateways++
+ gatewayType := strings.ToLower(gateway.GetType().String())
+ gatewayTypes["gateway_dp_type_"+gatewayType] += 1
+ }
+ }
+ b.mutable["gateway_dps"] = strconv.Itoa(ngateways)
+ for gtype, n := range gatewayTypes {
+ b.mutable[gtype] = strconv.Itoa(n)
+ }
+
+ meshes, err := fetchMeshes(ctx, rt)
+ if err != nil {
+ return err
+ }
+ b.mutable["meshes_total"] = strconv.Itoa(len(meshes.Items))
+
+ switch rt.Config().Mode {
+ case config_core.Standalone:
+ b.mutable["zones_total"] = strconv.Itoa(1)
+ case config_core.Global:
+ zones, err := fetchZones(ctx, rt)
+ if err != nil {
+ return err
+ }
+ b.mutable["zones_total"] = strconv.Itoa(len(zones.Items))
+ }
+
+ internalServices, externalServices, err := fetchNumOfServices(ctx, rt)
+ if err != nil {
+ return err
+ }
+ b.mutable["internal_services"] = strconv.Itoa(internalServices)
+ b.mutable["external_services"] = strconv.Itoa(externalServices)
+ b.mutable["services_total"] = strconv.Itoa(internalServices + externalServices)
+
+ policyCounts, err := fetchNumPolicies(ctx, rt)
+ if err != nil {
+ return err
+ }
+
+ for k, count := range policyCounts {
+ b.mutable[k] = count
+ }
+ return nil
+}
+
+func (b *reportsBuffer) dispatch(rt core_runtime.Runtime, host string, port int, pingType string, extraFn core_runtime.ExtraReportsFn) error {
+ if err := b.updateEntitiesReport(rt); err != nil {
+ return err
+ }
+ b.mutable["signal"] = pingType
+ b.mutable["cluster_id"] = rt.GetClusterId()
+ b.mutable["uptime"] = strconv.FormatInt(int64(time.Since(rt.GetStartTime())/time.Second), 10)
+ if extraFn != nil {
+ if valMap, err := extraFn(rt); err != nil {
+ return err
+ } else {
+ b.Append(valMap)
+ }
+ }
+ pingData, err := b.marshall()
+ if err != nil {
+ return err
+ }
+
+ conf := &tls.Config{}
+ conn, err := tls.Dial("tcp", net.JoinHostPort(host,
+ strconv.FormatUint(uint64(port), 10)), conf)
+ if err != nil {
+ return err
+ }
+
+ _, err = fmt.Fprint(conn, pingData)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Append information to the mutable portion of the reports buffer
+func (b *reportsBuffer) Append(info map[string]string) {
+ b.Lock()
+ defer b.Unlock()
+
+ for key, value := range info {
+ b.mutable[key] = value
+ }
+}
+
+func (b *reportsBuffer) initImmutable(rt core_runtime.Runtime) {
+ b.immutable["version"] = kuma_version.Build.Version
+ b.immutable["product"] = kuma_version.Product
+ b.immutable["unique_id"] = rt.GetInstanceId()
+ b.immutable["backend"] = rt.Config().Store.Type
+ b.immutable["mode"] = rt.Config().Mode
+
+ hostname, err := os.Hostname()
+ if err == nil {
+ b.immutable["hostname"] = hostname
+ }
+}
+
+func startReportTicker(rt core_runtime.Runtime, buffer *reportsBuffer, extraFn core_runtime.ExtraReportsFn) {
+ go func() {
+ err := buffer.dispatch(rt, pingHost, pingPort, "start", extraFn)
+ if err != nil {
+ log.V(2).Info("failed sending usage info", "cause", err.Error())
+ }
+ for range time.Tick(time.Second * pingInterval) {
+ err := buffer.dispatch(rt, pingHost, pingPort, "ping", extraFn)
+ if err != nil {
+ log.V(2).Info("failed sending usage info", "cause", err.Error())
+ }
+ }
+ }()
+}
+
+// Init core reports
+func Init(rt core_runtime.Runtime, cfg kuma_cp.Config, extraFn core_runtime.ExtraReportsFn) {
+ var buffer reportsBuffer
+ buffer.immutable = make(map[string]string)
+ buffer.mutable = make(map[string]string)
+
+ buffer.initImmutable(rt)
+
+ if cfg.Reports.Enabled {
+ startReportTicker(rt, &buffer, extraFn)
+ }
+}
diff --git a/pkg/core/runtime/runtime.go b/pkg/core/runtime/runtime.go
new file mode 100755
index 00000000..76f2e091
--- /dev/null
+++ b/pkg/core/runtime/runtime.go
@@ -0,0 +1,87 @@
+package runtime
+
+import (
+ "github.com/apache/dubbo-admin/pkg/core/runtime/component"
+ "sync"
+ "time"
+)
+
+// Runtime represents initialized application state.
+type Runtime interface {
+ RuntimeInfo
+ RuntimeContext
+ component.Manager
+}
+
+type RuntimeInfo interface {
+ GetInstanceId() string
+ SetClusterId(clusterId string)
+ GetClusterId() string
+ GetStartTime() time.Time
+}
+
+type RuntimeContext interface {
+ Config() kuma_cp.Config
+ DataSourceLoader() datasource.Loader
+ ResourceManager() core_manager.ResourceManager
+}
+
+type ExtraReportsFn func(Runtime) (map[string]string, error)
+
+var _ Runtime = &runtime{}
+
+type runtime struct {
+ RuntimeInfo
+ RuntimeContext
+ component.Manager
+}
+
+var _ RuntimeInfo = &runtimeInfo{}
+
+type runtimeInfo struct {
+ mtx sync.RWMutex
+
+ instanceId string
+ clusterId string
+ startTime time.Time
+}
+
+func (i *runtimeInfo) GetInstanceId() string {
+ return i.instanceId
+}
+
+func (i *runtimeInfo) SetClusterId(clusterId string) {
+ i.mtx.Lock()
+ defer i.mtx.Unlock()
+ i.clusterId = clusterId
+}
+
+func (i *runtimeInfo) GetClusterId() string {
+ i.mtx.RLock()
+ defer i.mtx.RUnlock()
+ return i.clusterId
+}
+
+func (i *runtimeInfo) GetStartTime() time.Time {
+ return i.startTime
+}
+
+var _ RuntimeContext = &runtimeContext{}
+
+type runtimeContext struct {
+ cfg kuma_cp.Config
+ rm core_manager.ResourceManager
+ rs core_store.ResourceStore
+}
+
+func (rc *runtimeContext) Config() kuma_cp.Config {
+ return rc.cfg
+}
+
+func (rc *runtimeContext) ResourceManager() core_manager.ResourceManager {
+ return rc.rm
+}
+
+func (rc *runtimeContext) ResourceStore() core_store.ResourceStore {
+ return rc.rs
+}
diff --git a/pkg/version/version.go b/pkg/version/version.go
index 40ac63dc..73c9451d 100644
--- a/pkg/version/version.go
+++ b/pkg/version/version.go
@@ -20,6 +20,7 @@ package version
import (
"encoding/json"
"fmt"
+ "github.com/spf13/cobra"
"runtime"
)
@@ -60,3 +61,26 @@ func GetVersionInfo() string {
result, _ := json.Marshal(version)
return string(result)
}
+
+func NewVersionCmd() *cobra.Command {
+ args := struct {
+ detailed bool
+ }{}
+ cmd := &cobra.Command{
+ Use: "version",
+ Short: "Print version",
+ Long: `Print version.`,
+ RunE: func(cmd *cobra.Command, _ []string) error {
+ if args.detailed {
+ cmd.Println(GetVersionInfo())
+ } else {
+ cmd.Printf("%s\n", gitVersion)
+ }
+
+ return nil
+ },
+ }
+ // flags
+ cmd.PersistentFlags().BoolVarP(&args.detailed, "detailed", "a", false, "Print detailed version")
+ return cmd
+}