You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ze...@apache.org on 2020/03/24 07:36:26 UTC
[servicecomb-service-center] branch master updated: Improve syncer
configuration file
This is an automated email from the ASF dual-hosted git repository.
zenlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new e851bb5 Improve syncer configuration file
e851bb5 is described below
commit e851bb5de3ddc31d44a5136eb989193ae42b1562
Author: chinx <c5...@126.com>
AuthorDate: Tue Feb 4 15:08:51 2020 +0800
Improve syncer configuration file
---
etc/conf/syncer.yaml | 56 ++++++++
go.mod | 4 +-
syncer/cmd/daemon.go | 44 +++---
syncer/config/config.go | 155 +++++---------------
syncer/config/config_test.go | 218 ++++++++++++++++++++++++++---
syncer/config/const.go | 111 +++++++++++++++
syncer/config/merge.go | 131 +++++++++++++++++
syncer/config/tlsconfig.go | 191 -------------------------
syncer/config/verify.go | 203 +++++++++++++++++++++++++++
syncer/pkg/utils/addr.go | 17 ++-
syncer/server/convert.go | 111 +++++++++++++++
syncer/server/handler.go | 10 +-
syncer/server/server.go | 57 +++++---
syncer/servicecenter/servicecenter_test.go | 4 +-
14 files changed, 939 insertions(+), 373 deletions(-)
diff --git a/etc/conf/syncer.yaml b/etc/conf/syncer.yaml
new file mode 100644
index 0000000..45b36c1
--- /dev/null
+++ b/etc/conf/syncer.yaml
@@ -0,0 +1,56 @@
+# run mode, supports (single, cluster)
+mode: signle
+# node name, must be unique on the network
+node: syncer-node
+# Cluster name, clustering by this name
+cluster: syncer-cluster
+dataDir: ./syncer-data/
+listener:
+ # Address used to network with other Syncers in LAN
+ bindAddr: 0.0.0.0:30190
+ # Address used to network with other Syncers in WAN
+ advertiseAddr: ""
+ # Address used to synchronize data with other Syncers
+ rpcAddr: 0.0.0.0:30191
+ # Address used to communicate with other cluster peers
+ peerAddr: 127.0.0.1:30192
+ tlsMount:
+ enabled: false
+ name: syncer
+join:
+ enabled: false
+ # Address to join the network by specifying at least one existing member
+ address: 127.0.0.1:30190
+ # Limit the maximum of RetryJoin, default is 0, means no limit
+ retryMax: 3
+ retryInterval: 30s
+task:
+ kind: ticker
+ params:
+ # Time interval between timing tasks, default is 30s
+ - key: interval
+ value: 30s
+registry:
+ plugin: servicecenter
+ address: http://127.0.0.1:30100
+ tlsMount:
+ enabled: false
+ name: servicecenter
+tlsConfigs:
+ - name: syncer
+ verifyPeer: true
+ minVersion: TLSv1.2
+ caFile: ./certs/trust.cer
+ certFile: ./certs/server.cer
+ keyFile: ./certs/server_key.pem
+ passphrase: ""
+ ciphers:
+ - TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
+ - TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
+ - TLS_RSA_WITH_AES_256_GCM_SHA384
+ - TLS_RSA_WITH_AES_128_GCM_SHA256
+ - name: servicecenter
+ verifyPeer: false
+ caFile: ./certs/trust.cer
+ certFile: ./certs/server.cer
+ keyFile: ./certs/server_key.pem
\ No newline at end of file
diff --git a/go.mod b/go.mod
index ebc0929..a40bbc3 100644
--- a/go.mod
+++ b/go.mod
@@ -73,8 +73,9 @@ require (
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492 // indirect
github.com/opentracing/opentracing-go v1.0.2
github.com/openzipkin/zipkin-go-opentracing v0.3.3-0.20180123190626-6bb822a7f15f
+ github.com/pborman/uuid v1.2.0 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
- github.com/pkg/errors v0.8.1 // indirect
+ github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.8.1-0.20170628125436-ab4214782d02
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e // indirect
@@ -86,6 +87,7 @@ require (
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/spf13/cobra v0.0.0-20170624150100-4d647c8944eb
github.com/spf13/pflag v1.0.0
+ github.com/stretchr/testify v1.3.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/ugorji/go v1.1.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
diff --git a/syncer/cmd/daemon.go b/syncer/cmd/daemon.go
index 8596a41..942330c 100644
--- a/syncer/cmd/daemon.go
+++ b/syncer/cmd/daemon.go
@@ -26,7 +26,7 @@ import (
)
var (
- conf = config.DefaultConfig()
+ conf = &config.Config{}
configFile = ""
)
@@ -42,28 +42,28 @@ func init() {
syncerCmd.Flags().StringVar(&conf.Mode, "mode", conf.Mode,
"run mode")
- syncerCmd.Flags().StringVar(&conf.NodeName, "node", conf.NodeName,
+ syncerCmd.Flags().StringVar(&conf.Node, "node", conf.Node,
"node name")
- syncerCmd.Flags().StringVar(&conf.BindAddr, "bind-addr", conf.BindAddr,
- "address to bind listeners to")
+ syncerCmd.Flags().StringVar(&conf.Cluster, "cluster", conf.Cluster,
+ "cluster name")
- syncerCmd.Flags().StringVar(&conf.RPCAddr, "rpc-addr", conf.RPCAddr,
- "port to bind RPC listener to")
+ syncerCmd.Flags().StringVar(&conf.Listener.BindAddr, "bind-addr", conf.Listener.BindAddr,
+ "address used to network with other Syncers")
- syncerCmd.Flags().StringVar(&conf.JoinAddr, "join-addr", conf.JoinAddr,
- "address to join the cluster by specifying at least one existing member")
+ syncerCmd.Flags().StringVar(&conf.Listener.RPCAddr, "rpc-addr", conf.Listener.RPCAddr,
+ "port used to synchronize data with other Syncers")
- syncerCmd.Flags().StringVar(&conf.SC.Addr, "sc-addr", conf.SC.Addr,
- "address to monitor the service-center")
+ syncerCmd.Flags().StringVar(&conf.Listener.PeerAddr, "peer-addr", conf.Listener.PeerAddr,
+ "port used to communicate with other cluster members")
- syncerCmd.Flags().StringVar(&conf.ClusterName, "cluster-name", conf.ClusterName,
- "name to group members into cluster")
+ syncerCmd.Flags().StringVar(&conf.Join.Address, "join", "",
+ "address to join the network by specifying at least one existing member")
- syncerCmd.Flags().IntVar(&conf.ClusterPort, "cluster-port", conf.ClusterPort,
- "port to communicate between cluster members")
+ syncerCmd.Flags().StringVar(&conf.Registry.Address, "registry", conf.Registry.Address,
+ "address to monitor the registry")
- syncerCmd.Flags().StringVar(&conf.SC.Plugin, "sc-plugin", conf.SC.Plugin,
+ syncerCmd.Flags().StringVar(&conf.Registry.Plugin, "plugin", conf.Registry.Plugin,
"plugin name of servicecenter")
syncerCmd.Flags().StringVar(&configFile, "config", "",
@@ -72,16 +72,26 @@ func init() {
// runSyncer Runs the Syncer service.
func runSyncer(cmd *cobra.Command, args []string) {
+ if conf.Join.Address != "" {
+ conf.Join.Enabled = true
+ }
+
+ defaultConfig := config.DefaultConfig()
+
if configFile != "" {
fromFile, err := config.LoadConfig(configFile)
if err != nil {
log.Errorf(err, "load config file failed")
return
}
- conf.Merge(fromFile)
+ if fromFile != nil {
+ *defaultConfig = config.Merge(*defaultConfig, *fromFile)
+ }
}
- if err := conf.Verify(); err != nil {
+ *conf = config.Merge(*defaultConfig, *conf)
+ err := config.Verify(conf)
+ if err != nil {
log.Errorf(err, "verify syncer config failed")
return
}
diff --git a/syncer/config/config.go b/syncer/config/config.go
index 35a27ea..c334320 100644
--- a/syncer/config/config.go
+++ b/syncer/config/config.go
@@ -17,82 +17,47 @@
package config
import (
- "crypto/md5"
- "fmt"
"io/ioutil"
"os"
- "strings"
+ "path/filepath"
+ "strconv"
"github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/syncer/etcd"
"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
- "github.com/apache/servicecomb-service-center/syncer/plugins"
- _ "github.com/apache/servicecomb-service-center/syncer/plugins/eureka"
- "github.com/apache/servicecomb-service-center/syncer/plugins/servicecenter"
- "github.com/apache/servicecomb-service-center/syncer/serf"
+ "github.com/pkg/errors"
"gopkg.in/yaml.v2"
+ "k8s.io/apimachinery/pkg/util/uuid"
)
-var (
- DefaultDCPort = 30100
- DefaultClusterPort = 30192
- DefaultTickerInterval = 30
- DefaultConfigPath = "./conf/config.yaml"
-
- syncerName = ""
- servicecenterName = "servicecenter"
-)
-
-// Config is the configuration that can be set for Syncer. Some of these
-// configurations are exposed as command-line flags.
-type Config struct {
- // Wraps the serf config
- *serf.Config
-
- // Wraps the etcd config
- Etcd *etcd.Config
- LogFile string `yaml:"log_file"`
-
- // JoinAddr The management address of one gossip pool member.
- JoinAddr string `yaml:"join_addr"`
- TickerInterval int `yaml:"ticker_interval"`
- Profile string `yaml:"profile"`
- EnableCompression bool `yaml:"enable_compression"`
- AutoSync bool `yaml:"auto_sync"`
- TLSConfig *TLSConfig `yaml:"tls_config"`
- SC *ServiceCenter `yaml:"servicecenter"`
-}
-
-// ServiceCenter configuration
-type ServiceCenter struct {
- // Addr servicecenter address, which is the service registry address.
- // Cluster mode is supported, and multiple addresses are separated by an English ",".
- Addr string `yaml:"addr"`
- Plugin string `yaml:"plugin"`
- TLSConfig *TLSConfig `yaml:"tls_config"`
- Endpoints []string `yaml:"-"`
-}
-
// DefaultConfig returns the default config
func DefaultConfig() *Config {
- serfConf := serf.DefaultConfig()
- etcdConf := etcd.DefaultConfig()
hostname, err := os.Hostname()
if err != nil {
log.Errorf(err, "Error determining hostname: %s", err)
- return nil
+ hostname = string(uuid.NewUUID())
}
- serfConf.NodeName = hostname
- etcdConf.Name = hostname
+
return &Config{
- TickerInterval: DefaultTickerInterval,
- Config: serfConf,
- Etcd: etcdConf,
- TLSConfig: DefaultTLSConfig(),
- SC: &ServiceCenter{
- Addr: fmt.Sprintf("127.0.0.1:%d", DefaultDCPort),
- Plugin: servicecenter.PluginName,
- TLSConfig: NewTLSConfig(servicecenterName),
+ Mode: ModeSingle,
+ Node: hostname,
+ DataDir: defaultDataDir + hostname,
+ Listener: Listener{
+ BindAddr: "0.0.0.0:" + strconv.Itoa(defaultBindPort),
+ RPCAddr: "0.0.0.0:" + strconv.Itoa(defaultRPCPort),
+ PeerAddr: "127.0.0.1:" + strconv.Itoa(defaultPeerPort),
+ },
+ Task: Task{
+ Kind: "ticker",
+ Params: []Label{
+ {
+ Key: defaultTaskKey,
+ Value: defaultTaskValue,
+ },
+ },
+ },
+ Registry: Registry{
+ Address: "http://127.0.0.1:30100",
+ Plugin: defaultDCPluginName,
},
}
}
@@ -100,10 +65,10 @@ func DefaultConfig() *Config {
// LoadConfig loads configuration from file
func LoadConfig(filepath string) (*Config, error) {
if filepath == "" {
- filepath = DefaultConfigPath
+ return nil, nil
}
if !(utils.IsFileExist(filepath)) {
- err := fmt.Errorf("file is not exist")
+ err := errors.New("file is not exist")
log.Errorf(err, "Load config from %s failed", filepath)
return nil, err
}
@@ -123,63 +88,15 @@ func LoadConfig(filepath string) (*Config, error) {
return conf, nil
}
-// Merge other configuration into the current configuration
-func (c *Config) Merge(other *Config) {
- if c.TLSConfig != nil && other.TLSConfig != nil {
- c.TLSConfig.Merge(syncerName, other.TLSConfig)
- }
-
- if c.SC != nil && c.SC.TLSConfig != nil && other.SC != nil && other.SC.TLSConfig != nil {
- c.SC.TLSConfig.Merge(servicecenterName, other.SC.TLSConfig)
- }
-}
-
-// Verify Provide config verification
-func (c *Config) Verify() error {
- ip, port, err := utils.SplitHostPort(c.BindAddr, serf.DefaultBindPort)
- if err != nil {
- return err
- }
- if ip == "127.0.0.1" {
- c.BindAddr = fmt.Sprintf("0.0.0.0:%d", port)
- }
-
- ip, port, err = utils.SplitHostPort(c.RPCAddr, serf.DefaultRPCPort)
- if err != nil {
- return err
- }
- c.RPCPort = port
- if ip == "127.0.0.1" {
- c.RPCAddr = fmt.Sprintf("0.0.0.0:%d", c.RPCPort)
- }
-
- if c.JoinAddr != "" {
- c.RetryJoin = strings.Split(c.JoinAddr, ",")
- }
-
- if c.ClusterName == "" {
- c.ClusterName = fmt.Sprintf("%x", md5.Sum([]byte(c.SC.Addr)))
- }
-
- c.TLSEnabled = c.TLSConfig.Enabled
-
- c.SC.Endpoints = strings.Split(c.SC.Addr, ",")
-
- c.Etcd.SetName(c.NodeName)
- return nil
+func (c *Config) GetTLSConfig(name string) *TLSConfig {
+ return findInTLSConfigs(c.TLSConfigs, name)
}
-func (sc *ServiceCenter) SCConfigOps() []plugins.SCConfigOption {
- opts := []plugins.SCConfigOption{plugins.WithEndpoints(strings.Split(sc.Addr, ","))}
- if sc.TLSConfig.Enabled {
- opts = append(opts,
- plugins.WithTLSEnabled(sc.TLSConfig.Enabled),
- plugins.WithTLSVerifyPeer(sc.TLSConfig.VerifyPeer),
- plugins.WithTLSPassphrase(sc.TLSConfig.Passphrase),
- plugins.WithTLSCAFile(sc.TLSConfig.CAFile),
- plugins.WithTLSCertFile(sc.TLSConfig.CertFile),
- plugins.WithTLSKeyFile(sc.TLSConfig.KeyFile),
- )
+func pathFromSSLEnvOrDefault(server, path string) string {
+ env := os.Getenv(defaultEnvSSLRoot)
+ if len(env) == 0 {
+ wd, _ := os.Getwd()
+ return filepath.Join(wd, defaultCertsDir, server, path)
}
- return opts
+ return os.ExpandEnv(filepath.Join("$"+defaultEnvSSLRoot, server, path))
}
diff --git a/syncer/config/config_test.go b/syncer/config/config_test.go
index 4afb608..317460d 100644
--- a/syncer/config/config_test.go
+++ b/syncer/config/config_test.go
@@ -16,33 +16,215 @@
*/
package config
-import "testing"
+import (
+ "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
-func TestDefaultVerification(t *testing.T) {
+ "github.com/stretchr/testify/assert"
+)
+
+func TestDefaultConfig(t *testing.T) {
conf := DefaultConfig()
- conf.Verify()
+ assert.NotNil(t, conf)
}
-func TestBindAddrVerification(t *testing.T) {
- conf := DefaultConfig()
- conf.BindAddr = "abcde"
- conf.Verify()
+func TestLoadConfig(t *testing.T) {
+ configFile := ""
+ conf, err := LoadConfig(configFile)
+ assert.Nil(t, conf)
+
+ configFile = "./test.yaml"
+ conf, err = LoadConfig(configFile)
+ assert.NotNil(t, err)
+
+ defer os.Remove(configFile)
+ err = createFile(configFile, notYAMLData())
+ assert.Nil(t, err)
+ conf, err = LoadConfig(configFile)
+ assert.NotNil(t, err)
+
+ err = createFile(configFile, correctConfiguration())
+ assert.Nil(t, err)
+ conf, err = LoadConfig(configFile)
+ assert.Nil(t, err)
}
-func TestRPCAddrAddrVerification(t *testing.T) {
- conf := DefaultConfig()
- conf.RPCAddr = "abcde"
- conf.Verify()
+func TestGetTLSConfig(t *testing.T) {
+ configFile := "./test.yaml"
+ defer os.Remove(configFile)
+ err := createFile(configFile, correctConfiguration())
+ assert.Nil(t, err)
+ conf, err := LoadConfig(configFile)
+ assert.Nil(t, err)
+
+ tlsConf := conf.GetTLSConfig(conf.Listener.TLSMount.Name)
+ assert.NotNil(t, tlsConf)
}
-func TestLocalBindAddrVerification(t *testing.T) {
- conf := DefaultConfig()
- conf.BindAddr = "127.0.0.1"
- conf.Verify()
+func TestMerge(t *testing.T) {
+ configFile := "./test.yaml"
+ defer os.Remove(configFile)
+ err := createFile(configFile, correctConfiguration())
+ assert.Nil(t, err)
+ conf, err := LoadConfig(configFile)
+ assert.Nil(t, err)
+
+ nConf := Merge(*conf, *conf, *DefaultConfig())
+ assert.NotNil(t, nConf)
}
-func TestLocalRPCAddrVerification(t *testing.T) {
+func TestVerify(t *testing.T) {
conf := DefaultConfig()
- conf.RPCAddr = "127.0.0.1"
- conf.Verify()
+ conf.DataDir = ""
+ err := Verify(conf)
+ assert.Nil(t, err)
+
+ configFile := "./test.yaml"
+ defer os.Remove(configFile)
+ err = createFile(configFile, correctConfiguration())
+ assert.Nil(t, err)
+ conf, err = LoadConfig(configFile)
+ assert.Nil(t, err)
+ err = Verify(conf)
+ assert.Nil(t, err)
+
+ bindAddr := conf.Listener.BindAddr
+ conf.Listener.BindAddr = ""
+ err = Verify(conf)
+ conf.Listener.BindAddr = bindAddr
+ assert.NotNil(t, err)
+
+ rpcAddr := conf.Listener.RPCAddr
+ conf.Listener.RPCAddr = ""
+ err = Verify(conf)
+ conf.Listener.RPCAddr = rpcAddr
+ assert.NotNil(t, err)
+
+ peerAddr := conf.Listener.PeerAddr
+ conf.Listener.PeerAddr = ""
+ err = Verify(conf)
+ conf.Listener.PeerAddr = peerAddr
+ assert.NotNil(t, err)
+
+ conf.Listener.TLSMount.Enabled = true
+ err = Verify(conf)
+ assert.Nil(t, err)
+
+ conf.Listener.TLSMount.Name += "_test"
+ err = Verify(conf)
+ conf.Listener.TLSMount.Enabled = false
+ assert.NotNil(t, err)
+
+ conf.Registry.TLSMount.Enabled = true
+ err = Verify(conf)
+ assert.NotNil(t, err)
+
+ conf.Registry.TLSMount.Name += "_test"
+ err = Verify(conf)
+ conf.Registry.TLSMount.Enabled = false
+ assert.NotNil(t, err)
+
+ registry := conf.Registry.Address
+ conf.Registry.Address = "127.0.0.1:xxx"
+ err = Verify(conf)
+ conf.Registry.Address = registry
+ assert.NotNil(t, err)
+
+ conf.Join.Enabled = true
+ joinAddr := conf.Join.Address
+ conf.Join.Address = "http://127.0.0.1:9999"
+ err = Verify(conf)
+ conf.Join.Address = joinAddr
+ assert.NotNil(t, err)
+
+ conf.Join.RetryMax = -1
+ conf.Join.RetryInterval = "3mams"
+ err = Verify(conf)
+ conf.Join.Enabled = false
+ assert.Nil(t, err)
+
+ params := conf.Task.Params
+ conf.Task.Kind = ""
+ conf.Task.Params = []Label{{Key: "test", Value:"test"}, {Key:defaultTaskKey, Value: "3mams"}}
+ err = Verify(conf)
+ conf.Task.Params = params
+ assert.NotNil(t, err)
+}
+
+func createFile(path string, data []byte) error {
+ fileDir := filepath.Dir(path)
+ if !utils.IsDirExist(fileDir) {
+ err := os.MkdirAll(fileDir, 0640)
+ if err != nil {
+ return err
+ }
+ }
+ return ioutil.WriteFile(path, data, 0640)
}
+
+func notYAMLData() []byte {
+ return []byte("xxxxxxxxxxxx")
+}
+
+func correctConfiguration() []byte {
+ return []byte(`# run mode, supports (single, cluster)
+mode: signle
+# node name, must be unique on the network
+node: syncer-node
+# Cluster name, clustering by this name
+cluster: syncer-cluster
+dataDir: ./syncer-data/
+listener:
+ # Address used to network with other Syncers in LAN
+ bindAddr: 0.0.0.0:30190
+ # Address used to network with other Syncers in WAN
+ advertiseAddr: ""
+ # Address used to synchronize data with other Syncers
+ rpcAddr: 0.0.0.0:30191
+ # Address used to communicate with other cluster peers
+ peerAddr: 127.0.0.1:30192
+ tlsMount:
+ enabled: false
+ name: syncer
+join:
+ enabled: false
+ # Address to join the network by specifying at least one existing member
+ address: 127.0.0.1:30190
+ # Limit the maximum of RetryJoin, default is 0, means no limit
+ retryMax: 3
+ retryInterval: 30s
+task:
+ kind: ticker
+ params:
+ # Time interval between timing tasks, default is 30s
+ - key: interval
+ value: 30s
+registry:
+ plugin: servicecenter
+ address: http://127.0.0.1:30100
+ tlsMount:
+ enabled: false
+ name: servicecenter
+tlsConfigs:
+ - name: syncer
+ verifyPeer: true
+ minVersion: TLSv1.2
+ caFile: ./certs/trust.cer
+ certFile: ./certs/server.cer
+ keyFile: ./certs/server_key.pem
+ passphrase: ""
+ ciphers:
+ - TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
+ - TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
+ - TLS_RSA_WITH_AES_256_GCM_SHA384
+ - TLS_RSA_WITH_AES_128_GCM_SHA256
+ - name: servicecenter
+ verifyPeer: false
+ caFile: ./certs/trust.cer
+ certFile: ./certs/server.cer
+ keyFile: ./certs/server_key.pem
+`)
+}
\ No newline at end of file
diff --git a/syncer/config/const.go b/syncer/config/const.go
new file mode 100644
index 0000000..5465ea9
--- /dev/null
+++ b/syncer/config/const.go
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+const (
+ defaultBindPort = 30190
+ defaultRPCPort = 30191
+ defaultPeerPort = 30192
+ defaultTaskKind = "ticker"
+ defaultTaskKey = "interval"
+ defaultTaskValue = "30s"
+ defaultDataDir = "./syncer-data/"
+ defaultDCPluginName = "servicecenter"
+ defaultRetryJoinMax = 3
+ defaultRetryJoinInterval = "30s"
+
+ defaultEnvSSLRoot = "SSL_ROOT"
+ defaultCertsDir = "certs"
+ defaultCAName = "trust.cer"
+ defaultCertName = "server.cer"
+ defaultKeyName = "server_key.pem"
+
+ // ModeSingle run as a single server
+ ModeSingle = "single"
+ // ModeCluster run as a cluster peer
+ ModeCluster = "cluster"
+)
+
+// Config is the configuration that can be set for Syncer. Some of these
+// configurations are exposed as command-line flags.
+type Config struct {
+ Mode string `yaml:"mode"`
+ Node string `yaml:"node"`
+ Cluster string `yaml:"cluster"`
+ DataDir string `yaml:"dataDir"`
+ Listener Listener `yaml:"listener"`
+ Join Join `yaml:"join"`
+ Task Task `yaml:"task"`
+ Registry Registry `yaml:"registry"`
+ TLSConfigs []*TLSConfig `yaml:"tlsConfigs"`
+}
+
+// Listener Configuration for Syncer listener
+type Listener struct {
+ BindAddr string `yaml:"bindAddr"`
+ AdvertiseAddr string `yaml:"advertiseAddr"`
+ RPCAddr string `yaml:"rpcAddr"`
+ PeerAddr string `yaml:"peerAddr"`
+ TLSMount Mount `yaml:"tlsMount"`
+}
+
+// Join Configuration for Syncer join the network
+type Join struct {
+ Enabled bool `yaml:"enabled"`
+ Address string `yaml:"address"`
+ RetryMax int `yaml:"retryMax"`
+ RetryInterval string `yaml:"retryInterval"`
+}
+
+// Task
+type Task struct {
+ Kind string `yaml:"kind"`
+ Params []Label `yaml:"params"`
+}
+
+// Label pair of key and value
+type Label struct {
+ Key string
+ Value string
+}
+
+// Registry configuration
+type Registry struct {
+ // Address is the service registry address.
+ Address string `json:"address"`
+ Plugin string `yaml:"plugin"`
+ TLSMount Mount `yaml:"tlsMount"`
+}
+
+// Mount Specifying config and purpose
+type Mount struct {
+ Enabled bool `yaml:"enabled"`
+ Name string `yaml:"name"`
+}
+
+// TLSConfig tls configuration
+type TLSConfig struct {
+ Name string `yaml:"name"`
+ VerifyPeer bool `yaml:"verifyPeer"`
+ MinVersion string `yaml:"minVersion"`
+ Passphrase string `yaml:"passphrase"`
+ CAFile string `yaml:"caFile"`
+ CertFile string `yaml:"certFile"`
+ KeyFile string `yaml:"keyFile"`
+ Ciphers []string `yaml:"ciphers"`
+}
diff --git a/syncer/config/merge.go b/syncer/config/merge.go
new file mode 100644
index 0000000..2b3f2b2
--- /dev/null
+++ b/syncer/config/merge.go
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "time"
+)
+
+// Merge multiple configurations into one
+func Merge(configs ...Config) (conf Config) {
+ for _, config := range configs {
+ conf = merge(conf, config)
+ }
+ return
+}
+
+func merge(src, dst Config) Config {
+ src.Mode = mergeString(src.Mode, dst.Mode)
+ src.Node = mergeString(src.Node, dst.Node)
+ src.Cluster = mergeString(src.Cluster, dst.Cluster)
+ src.DataDir = mergeString(src.DataDir, dst.DataDir)
+
+ src.Listener.BindAddr = mergeString(src.Listener.BindAddr, dst.Listener.BindAddr)
+ src.Listener.RPCAddr = mergeString(src.Listener.RPCAddr, dst.Listener.RPCAddr)
+ src.Listener.PeerAddr = mergeString(src.Listener.PeerAddr, dst.Listener.PeerAddr)
+ src.Listener.TLSMount.Enabled = mergeBool(src.Listener.TLSMount.Enabled, dst.Listener.TLSMount.Enabled)
+ src.Listener.TLSMount.Name = mergeString(src.Listener.TLSMount.Name, dst.Listener.TLSMount.Name)
+
+ src.Join.Enabled = mergeBool(src.Join.Enabled, dst.Join.Enabled)
+ src.Join.Address = mergeString(src.Join.Address, dst.Join.Address)
+ src.Join.RetryMax = mergeInt(src.Join.RetryMax, dst.Join.RetryMax)
+ src.Join.RetryInterval = mergeTimeString(src.Join.RetryInterval, dst.Join.RetryInterval)
+
+ src.Task.Kind = mergeString(src.Task.Kind, dst.Task.Kind)
+ src.Task.Params = mergeLabels(src.Task.Params, dst.Task.Params)
+
+ src.Registry.Address = mergeString(src.Registry.Address, dst.Registry.Address)
+ src.Registry.Plugin = mergeString(src.Registry.Plugin, dst.Registry.Plugin)
+ src.Registry.TLSMount.Enabled = mergeBool(src.Registry.TLSMount.Enabled, dst.Registry.TLSMount.Enabled)
+ src.Registry.TLSMount.Name = mergeString(src.Registry.TLSMount.Name, dst.Registry.TLSMount.Name)
+
+ src.TLSConfigs = mergeTLSConfigs(src.TLSConfigs, dst.TLSConfigs)
+ return src
+}
+
+func mergeString(src, dst string) string {
+ if dst != "" {
+ return dst
+ }
+ return src
+}
+
+func mergeInt(src, dst int) int {
+ if dst != 0 {
+ return dst
+ }
+ return src
+}
+
+func mergeBool(src, dst bool) bool {
+ return dst
+}
+
+func mergeTimeString(src, dst string) string {
+ _, err := time.ParseDuration(dst)
+ if err != nil {
+ return src
+ }
+ return dst
+}
+
+func mergeLabels(src, dst []Label) []Label {
+ if len(src) == 0 {
+ return dst[:]
+ }
+
+ merges := src[:]
+ for _, dv := range dst {
+ if findInLabels(src, dv.Key) == nil {
+ merges = append(merges, dv)
+ }
+ }
+ return merges
+}
+
+func findInLabels(labels []Label, key string) *Label {
+ for _, item := range labels {
+ if item.Key == key {
+ return &item
+ }
+ }
+ return nil
+}
+
+func mergeTLSConfigs(src, dst []*TLSConfig) []*TLSConfig {
+ if len(src) == 0 {
+ return dst[:]
+ }
+
+ merges := src[:]
+ for _, dv := range dst {
+ if findInTLSConfigs(src, dv.Name) == nil {
+ merges = append(merges, dv)
+ }
+ }
+ return merges
+}
+
+func findInTLSConfigs(list []*TLSConfig, name string) *TLSConfig {
+ for _, item := range list {
+ if item.Name == name {
+ return item
+ }
+ }
+ return nil
+}
diff --git a/syncer/config/tlsconfig.go b/syncer/config/tlsconfig.go
deleted file mode 100644
index 9f0ce4d..0000000
--- a/syncer/config/tlsconfig.go
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package config
-
-import (
- "crypto/tls"
- "fmt"
- "os"
- "path/filepath"
- "strings"
- "sync"
-
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/tlsutil"
- "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
-)
-
-const (
- dirName = "certs"
- caCert = "trust.cer"
- serverCert = "server.cer"
- serverKey = "server_key.pem"
-)
-
-// TLSConfig tls configuration
-type TLSConfig struct {
- Enabled bool `yaml:"enabled"`
- VerifyPeer bool `yaml:"verify_peer"`
- MinVersion string `yaml:"min_version"`
- Passphrase string `yaml:"passphrase"`
- CAFile string `yaml:"ca_file"`
- CertFile string `yaml:"cert_file"`
- KeyFile string `yaml:"key_file"`
- Ciphers []string `yaml:"ciphers"`
- clientTlsConfig *tls.Config `yaml:"-"`
- serverTlsConfig *tls.Config `yaml:"-"`
- mux sync.Mutex `yaml:"-"`
-}
-
-// DefaultTLSConfig returns default tls configuration
-func DefaultTLSConfig() *TLSConfig {
- return &TLSConfig{
- Enabled: false,
- }
-}
-
-// NewTLSConfig returns tls configuration by server
-func NewTLSConfig(server string) *TLSConfig {
- return &TLSConfig{
- Enabled: true,
- VerifyPeer: true,
- CAFile: pathFromSSLEnvOrDefault(server, caCert),
- CertFile: pathFromSSLEnvOrDefault(server, serverCert),
- KeyFile: pathFromSSLEnvOrDefault(server, serverKey),
- }
-}
-
-// Merge other tls configuration into the current configuration
-func (t *TLSConfig) Merge(server string, other *TLSConfig) {
- if !other.Enabled {
- return
- }
- t.Enabled = other.Enabled
- t.VerifyPeer = other.VerifyPeer
- t.MinVersion = other.MinVersion
-
- if other.CAFile == "" && t.CAFile == "" {
- other.CAFile = pathFromSSLEnvOrDefault(server, caCert)
- }
- t.CAFile = other.CAFile
-
- if other.CertFile == "" && t.CertFile == "" {
- other.CertFile = pathFromSSLEnvOrDefault(server, serverCert)
- }
- t.CertFile = other.CertFile
-
- if other.KeyFile == "" && t.KeyFile == "" {
- other.KeyFile = pathFromSSLEnvOrDefault(server, serverKey)
- }
- t.KeyFile = other.KeyFile
-}
-
-// Verify the tls configuration
-func (t *TLSConfig) Verify() (err error) {
- if !t.Enabled {
- return
- }
-
- if t.CAFile == "" || !utils.IsFileExist(t.CAFile) {
- err = fmt.Errorf("tls ca file '%s' is not found", t.CAFile)
- }
-
- if err == nil && t.CertFile == "" || !utils.IsFileExist(t.CertFile) {
- err = fmt.Errorf("tls cert file '%s' is not found", t.CertFile)
- }
-
- if err == nil && t.KeyFile == "" || !utils.IsFileExist(t.KeyFile) {
- err = fmt.Errorf("tls key file '%s' is not found", t.KeyFile)
- }
-
- if err == nil {
- for _, cipher := range t.Ciphers {
- if _, ok := tlsutil.TLS_CIPHER_SUITE_MAP[cipher]; !ok {
- err = fmt.Errorf("cipher %s not exist", cipher)
- break
- }
- }
- }
-
- if err != nil {
- log.Error("verify tls configuration failed", err)
- }
- return
-}
-
-// ClientTlsConfig get the tls.config of client
-func (t *TLSConfig) ClientTlsConfig() (*tls.Config, error) {
- if !t.Enabled {
- return nil, nil
- }
-
- t.mux.Lock()
- defer t.mux.Unlock()
- if t.clientTlsConfig != nil {
- return t.clientTlsConfig, nil
- }
-
- opts := append(tlsutil.DefaultClientTLSOptions(), t.toOptions()...)
- conf, err := tlsutil.GetClientTLSConfig(opts...)
- if err != nil {
- log.Error("get client tls config failed", err)
- }
- t.clientTlsConfig = conf
- return conf, err
-}
-
-// ServerTlsConfig get the tls.config of server
-func (t *TLSConfig) ServerTlsConfig() (*tls.Config, error) {
- if !t.Enabled {
- return nil, nil
- }
-
- if t.serverTlsConfig != nil {
- return t.serverTlsConfig, nil
- }
-
- opts := append(tlsutil.DefaultServerTLSOptions(), t.toOptions()...)
- conf, err := tlsutil.GetServerTLSConfig(opts...)
- if err != nil {
- log.Error("get server tls config failed", err)
- }
- t.serverTlsConfig = conf
- return conf, err
-}
-
-func (t *TLSConfig) toOptions() []tlsutil.SSLConfigOption {
- return []tlsutil.SSLConfigOption{
- tlsutil.WithVerifyPeer(t.VerifyPeer),
- tlsutil.WithVersion(tlsutil.ParseSSLProtocol(t.MinVersion), tls.VersionTLS12),
- tlsutil.WithCipherSuits(
- tlsutil.ParseDefaultSSLCipherSuites(strings.Join(t.Ciphers, ","))),
- tlsutil.WithKeyPass(t.Passphrase),
- tlsutil.WithCA(t.CAFile),
- tlsutil.WithCert(t.CertFile),
- tlsutil.WithKey(t.KeyFile),
- }
-}
-
-func pathFromSSLEnvOrDefault(server, path string) string {
- env := os.Getenv("SSL_ROOT")
- if len(env) == 0 {
- wd, _ := os.Getwd()
- return filepath.Join(wd, dirName, server, path)
- }
- return os.ExpandEnv(filepath.Join("$SSL_ROOT", server, path))
-}
diff --git a/syncer/config/verify.go b/syncer/config/verify.go
new file mode 100644
index 0000000..13e37d6
--- /dev/null
+++ b/syncer/config/verify.go
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "crypto/md5"
+ "fmt"
+ "net"
+ "net/url"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/tlsutil"
+ "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+ "github.com/pkg/errors"
+)
+
+// Verify Provide config verification
+func Verify(c *Config) (err error) {
+ if err = verifyListener(&c.Listener); err != nil {
+ return
+ }
+
+ if err = verifyJoin(&c.Join); err != nil {
+ return
+ }
+
+ if err = verifyTask(&c.Task); err != nil {
+ return
+ }
+
+ if err = verifyRegistry(&c.Registry); err != nil {
+ return
+ }
+
+ if c.Listener.TLSMount.Enabled {
+ listenerTls := c.GetTLSConfig(c.Listener.TLSMount.Name)
+ if listenerTls == nil {
+ err = errors.Errorf("listener tls config notfound, name = %s", c.Listener.TLSMount.Name)
+ return
+ }
+ if err = verifyTLSConfig(listenerTls); err == nil {
+ return
+ }
+ }
+
+ if c.Registry.TLSMount.Enabled {
+ registryTls := c.GetTLSConfig(c.Listener.TLSMount.Name)
+ if registryTls == nil {
+ err = errors.Errorf("registry tls config notfound, name = %s", c.Registry.TLSMount.Name)
+ return
+ }
+ if err = verifyTLSConfig(registryTls); err == nil {
+ return
+ }
+ }
+
+ if c.Cluster == "" {
+ endpoints := strings.Split(c.Registry.Address, ",")
+ sort.Strings(endpoints)
+ str := strings.Join(endpoints, ",")
+ c.Cluster = fmt.Sprintf("%x", md5.Sum([]byte(str)))
+ }
+
+ if c.DataDir == "" {
+ c.DataDir = defaultDataDir + c.Node
+ }
+ c.DataDir = filepath.Dir(c.DataDir)
+ return nil
+}
+
+func verifyListener(listener *Listener) (err error) {
+ bindHost, bindPort, bErr := utils.SplitHostPort(listener.BindAddr, defaultBindPort)
+ if bErr != nil {
+ err = errors.Wrapf(bErr, "verify bind address failed, url is %s", listener.BindAddr)
+ return
+ }
+ listener.BindAddr = bindHost + ":" + strconv.Itoa(bindPort)
+
+ rpcHost, rpcPort, rErr := utils.SplitHostPort(listener.RPCAddr, defaultRPCPort)
+ if rErr != nil {
+ err = errors.Wrapf(rErr, "verify rpc address failed, url is %s", listener.RPCAddr)
+ return
+ }
+ listener.RPCAddr = rpcHost + ":" + strconv.Itoa(rpcPort)
+
+ peerHost, peerPort, pErr := utils.SplitHostPort(listener.PeerAddr, defaultPeerPort)
+ if pErr != nil {
+ err = errors.Wrapf(pErr, "verify peer address failed, url is %s", listener.PeerAddr)
+ return
+ }
+ listener.PeerAddr = peerHost + ":" + strconv.Itoa(peerPort)
+ return
+}
+
+func verifyJoin(join *Join) (err error) {
+ if !join.Enabled {
+ return
+ }
+ endpoints := strings.Split(join.Address, ",")
+ for _, addr := range endpoints {
+ _, _, err1 := net.SplitHostPort(addr)
+ if err1 != nil {
+ err = errors.Wrapf(err1, "Verify joinAddr failed, urls has %s", addr)
+ return
+ }
+ }
+
+ if join.RetryMax < 0 {
+ join.RetryMax = defaultRetryJoinMax
+ }
+
+ if _, err1 := time.ParseDuration(join.RetryInterval); err1 != nil {
+ log.Warnf("join retry interval '%s' is wrong", join.RetryInterval)
+ join.RetryInterval = defaultRetryJoinInterval
+ }
+ return
+}
+
+func verifyTask(task *Task) (err error) {
+ if task.Kind == "" {
+ task.Kind = defaultTaskKind
+ }
+
+ if task.Kind == defaultTaskKind {
+ for _, label := range task.Params {
+ if label.Key != defaultTaskKey {
+ continue
+ }
+ _, err1 := time.ParseDuration(label.Value)
+ if err1 != nil {
+ err = errors.Wrapf(err1, "Verify task params failed, key = %s, value = %s", label.Key, label.Value)
+ return
+ }
+ }
+ }
+ return
+}
+
+func verifyRegistry(r *Registry) (err error) {
+ endpoints := strings.Split(r.Address, ",")
+ for _, addr := range endpoints {
+ _, err = url.Parse(addr)
+ if err != nil {
+ log.Errorf(err, "Verify registry endpoints failed, urls has %s", addr)
+ return err
+ }
+ }
+ return nil
+}
+
+func verifyTLSConfig(conf *TLSConfig) (err error) {
+ if conf.CAFile == "" || !utils.IsFileExist(conf.CAFile) {
+ conf.CAFile = pathFromSSLEnvOrDefault(conf.Name, defaultCAName)
+ if !utils.IsFileExist(conf.CAFile) {
+ err = errors.Errorf("tls ca file '%s' is not found", conf.CAFile)
+ return
+ }
+ }
+
+ if conf.CertFile == "" || !utils.IsFileExist(conf.CertFile) {
+ conf.CertFile = pathFromSSLEnvOrDefault(conf.Name, defaultCertName)
+ if !utils.IsFileExist(conf.CertFile) {
+ err = errors.Errorf("tls cert file '%s' is not found", conf.CertFile)
+ return
+ }
+ }
+
+ if conf.KeyFile == "" || !utils.IsFileExist(conf.KeyFile) {
+ conf.KeyFile = pathFromSSLEnvOrDefault(conf.Name, defaultKeyName)
+ if !utils.IsFileExist(conf.KeyFile) {
+ err = errors.Errorf("tls key file '%s' is not found", conf.KeyFile)
+ return
+ }
+ }
+
+ for _, cipher := range conf.Ciphers {
+ if _, ok := tlsutil.TLS_CIPHER_SUITE_MAP[cipher]; !ok {
+ err = errors.Errorf("cipher %s not exist", cipher)
+ return
+ }
+ }
+ return
+}
diff --git a/syncer/pkg/utils/addr.go b/syncer/pkg/utils/addr.go
index f84c4fe..0fbde0b 100644
--- a/syncer/pkg/utils/addr.go
+++ b/syncer/pkg/utils/addr.go
@@ -17,21 +17,32 @@
package utils
import (
- "fmt"
"net"
+ "strconv"
+ "strings"
)
// SplitHostPort returns the parts of the address and port. If the port does not exist, use defaultPort.
func SplitHostPort(address string, defaultPort int) (string, int, error) {
_, _, err := net.SplitHostPort(address)
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
- address = fmt.Sprintf("%s:%d", address, defaultPort)
- _, _, err = net.SplitHostPort(address)
+ index := strings.LastIndexByte(address, ':')
+ return SplitAddress(address[:index+1] + strconv.Itoa(defaultPort))
}
+ return ResolveAddr(address)
+}
+
+// SplitAddress returns the parts of the address and port.
+func SplitAddress(address string) (string, int, error) {
+ _, _, err := net.SplitHostPort(address)
if err != nil {
return "", 0, err
}
+ return ResolveAddr(address)
+}
+// ResolveAddr Resolve the address with tcp.
+func ResolveAddr(address string) (string, int, error) {
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return "", 0, err
diff --git a/syncer/server/convert.go b/syncer/server/convert.go
new file mode 100644
index 0000000..1f56762
--- /dev/null
+++ b/syncer/server/convert.go
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package server
+
+import (
+ "crypto/tls"
+ "net/url"
+ "strings"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/tlsutil"
+ "github.com/apache/servicecomb-service-center/syncer/config"
+ "github.com/apache/servicecomb-service-center/syncer/etcd"
+ "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+ "github.com/apache/servicecomb-service-center/syncer/plugins"
+ "github.com/apache/servicecomb-service-center/syncer/serf"
+)
+
+func convertSerfConfig(c *config.Config) *serf.Config {
+ conf := serf.DefaultConfig()
+ conf.NodeName = c.Node
+ conf.ClusterName = c.Cluster
+ conf.Mode = c.Mode
+ conf.TLSEnabled = c.Listener.TLSMount.Enabled
+ conf.BindAddr = c.Listener.BindAddr
+ _, conf.ClusterPort, _ = utils.ResolveAddr(c.Listener.PeerAddr)
+ _, conf.RPCPort, _ = utils.ResolveAddr(c.Listener.RPCAddr)
+ if c.Join.Enabled {
+ conf.RetryJoin = strings.Split(c.Join.Address, ",")
+ conf.RetryInterval, _ = time.ParseDuration(c.Join.RetryInterval)
+ conf.RetryMaxAttempts = c.Join.RetryMax
+ }
+ return conf
+}
+
+func convertEtcdConfig(c *config.Config) *etcd.Config {
+ conf := etcd.DefaultConfig()
+ conf.Name = c.Node
+ conf.Dir = c.DataDir
+ proto := "http://"
+
+ if c.Listener.TLSMount.Enabled {
+ proto = "https://"
+ }
+
+ peer, _ := url.Parse(proto + c.Listener.PeerAddr)
+ conf.APUrls = []url.URL{*peer}
+ conf.LPUrls = []url.URL{*peer}
+ return conf
+}
+
+func convertTickerInterval(c *config.Config) int {
+ strNum := ""
+ for _, label := range c.Task.Params {
+ if label.Key == "interval" {
+ strNum = label.Value
+ break
+ }
+ }
+ interval, _ := time.ParseDuration(strNum)
+ return int(interval.Seconds())
+}
+
+func convertSCConfigOption(c *config.Config) []plugins.SCConfigOption {
+ endpoints := make([]string, 0, 10)
+ for _, endpoint := range strings.Split(c.Registry.Address, ",") {
+ endpoints = append(endpoints, endpoint)
+ }
+ opts := []plugins.SCConfigOption{plugins.WithEndpoints(endpoints)}
+
+ if c.Registry.TLSMount.Enabled {
+ tlsConf := c.GetTLSConfig(c.Registry.TLSMount.Name)
+ opts = append(
+ opts, plugins.WithTLSEnabled(c.Registry.TLSMount.Enabled),
+ plugins.WithTLSVerifyPeer(tlsConf.VerifyPeer),
+ plugins.WithTLSPassphrase(tlsConf.Passphrase),
+ plugins.WithTLSCAFile(tlsConf.CAFile),
+ plugins.WithTLSCertFile(tlsConf.CertFile),
+ plugins.WithTLSKeyFile(tlsConf.KeyFile),
+ )
+ }
+ return opts
+}
+
+func tlsConfigToOptions(t *config.TLSConfig) []tlsutil.SSLConfigOption {
+ return []tlsutil.SSLConfigOption{
+ tlsutil.WithVerifyPeer(t.VerifyPeer),
+ tlsutil.WithVersion(tlsutil.ParseSSLProtocol(t.MinVersion), tls.VersionTLS12),
+ tlsutil.WithCipherSuits(
+ tlsutil.ParseDefaultSSLCipherSuites(strings.Join(t.Ciphers, ","))),
+ tlsutil.WithKeyPass(t.Passphrase),
+ tlsutil.WithCA(t.CAFile),
+ tlsutil.WithCert(t.CertFile),
+ tlsutil.WithKey(t.KeyFile),
+ }
+}
diff --git a/syncer/server/handler.go b/syncer/server/handler.go
index d18a20b..e7c349e 100644
--- a/syncer/server/handler.go
+++ b/syncer/server/handler.go
@@ -23,6 +23,7 @@ import (
"strconv"
"github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/tlsutil"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/syncer/grpc"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
@@ -45,7 +46,7 @@ func (s *Server) tickHandler(ctx context.Context) {
s.servicecenter.FlushData()
// sends a UserEvent on Serf, the event will be broadcast between members
- err := s.agent.UserEvent(EventDiscovered, util.StringToBytesWithNoCopy(s.conf.ClusterName), true)
+ err := s.agent.UserEvent(EventDiscovered, util.StringToBytesWithNoCopy(s.conf.Cluster), true)
if err != nil {
log.Errorf(err, "Syncer send user event failed")
}
@@ -63,7 +64,6 @@ func (s *Server) HandleEvent(event serf.Event) {
if !s.etcd.IsLeader() {
return
}
-
switch event.EventType() {
case serf.EventUser:
s.userEvent(event.(serf.UserEvent))
@@ -80,7 +80,7 @@ func (s *Server) userEvent(event serf.UserEvent) {
clusterName := util.BytesToStringWithNoCopy(event.Payload)
// Excludes notifications from self, as the gossip protocol inevitably has redundant notifications
- if s.conf.ClusterName == clusterName {
+ if s.conf.Cluster == clusterName {
return
}
@@ -102,7 +102,9 @@ func (s *Server) userEvent(event serf.UserEvent) {
}
var tlsConfig *tls.Config
if enabled {
- tlsConfig, err = s.conf.TLSConfig.ClientTlsConfig()
+ conf := s.conf.GetTLSConfig(s.conf.Listener.TLSMount.Name)
+ sslOps := append(tlsutil.DefaultClientTLSOptions(), tlsConfigToOptions(conf)...)
+ tlsConfig, err = tlsutil.GetClientTLSConfig(sslOps...)
if err != nil {
log.Error("get grpc client tls config failed", err)
return
diff --git a/syncer/server/server.go b/syncer/server/server.go
index 8df56f6..e5eafdf 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -18,6 +18,7 @@ package server
import (
"context"
+ "crypto/tls"
"errors"
"net/url"
"strconv"
@@ -25,14 +26,20 @@ import (
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/tlsutil"
"github.com/apache/servicecomb-service-center/syncer/config"
"github.com/apache/servicecomb-service-center/syncer/etcd"
"github.com/apache/servicecomb-service-center/syncer/grpc"
"github.com/apache/servicecomb-service-center/syncer/pkg/syssig"
"github.com/apache/servicecomb-service-center/syncer/pkg/ticker"
+ "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
"github.com/apache/servicecomb-service-center/syncer/plugins"
"github.com/apache/servicecomb-service-center/syncer/serf"
"github.com/apache/servicecomb-service-center/syncer/servicecenter"
+
+ // import plugins
+ _ "github.com/apache/servicecomb-service-center/syncer/plugins/eureka"
+ _ "github.com/apache/servicecomb-service-center/syncer/plugins/servicecenter"
)
var stopChanErr = errors.New("stopped syncer by stopCh")
@@ -60,7 +67,8 @@ type Server struct {
// Wrap the servicecenter
servicecenter servicecenter.Servicecenter
- etcd *etcd.Agent
+ etcd *etcd.Agent
+ etcdConf *etcd.Config
// Wraps the serf agent
agent *serf.Agent
@@ -118,8 +126,10 @@ func (s *Server) Run(ctx context.Context) {
s.servicecenter.SetStorageEngine(s.etcd.Storage())
s.agent.RegisterEventHandler(s)
-
gopool.Go(s.tick.Start)
+
+ log.Info("start service done")
+
<-s.stopCh
s.Stop()
@@ -173,52 +183,63 @@ func (s *Server) initialization() (err error) {
return
}
- s.agent, err = serf.Create(s.conf.Config)
+ s.agent, err = serf.Create(convertSerfConfig(s.conf))
if err != nil {
log.Errorf(err, "Create serf failed, %s", err)
return
}
- s.etcd = etcd.NewAgent(s.conf.Etcd)
+ s.etcdConf = convertEtcdConfig(s.conf)
+ s.etcd = etcd.NewAgent(s.etcdConf)
- s.tick = ticker.NewTaskTicker(s.conf.TickerInterval, s.tickHandler)
+ s.tick = ticker.NewTaskTicker(convertTickerInterval(s.conf), s.tickHandler)
- s.servicecenter, err = servicecenter.NewServicecenter(s.conf.SC.SCConfigOps()...)
+ s.servicecenter, err = servicecenter.NewServicecenter(convertSCConfigOption(s.conf)...)
if err != nil {
log.Error("create servicecenter failed", err)
return
}
- tlsConfig, err := s.conf.TLSConfig.ServerTlsConfig()
- if err != nil {
- log.Error("get grpc server tls config failed", err)
- return
+ var tlsConfig *tls.Config
+ if s.conf.Listener.TLSMount.Enabled {
+ conf := s.conf.GetTLSConfig(s.conf.Listener.TLSMount.Name)
+ sslOps := append(tlsutil.DefaultServerTLSOptions(), tlsConfigToOptions(conf)...)
+ tlsConfig, err = tlsutil.GetServerTLSConfig(sslOps...)
+ if err != nil {
+ log.Error("get grpc server tls config failed", err)
+ return
+ }
}
- s.grpc = grpc.NewServer(s.conf.RPCAddr, s, tlsConfig)
+
+ s.grpc = grpc.NewServer(s.conf.Listener.RPCAddr, s, tlsConfig)
return nil
}
// initPlugin Initialize the plugin and load the external plugin according to the configuration
func (s *Server) initPlugin() {
- plugins.SetPluginConfig(plugins.PluginServicecenter.String(), s.conf.SC.Plugin)
+ plugins.SetPluginConfig(plugins.PluginServicecenter.String(), s.conf.Registry.Plugin)
plugins.LoadPlugins()
}
// configureCluster Configuring the cluster by serf group member information
func (s *Server) configureCluster() error {
- proto := "http" // todoļ¼Introduce tls config to manage protocol
+ proto := "http"
+ if s.conf.Listener.TLSMount.Enabled {
+ proto = "https"
+ }
initialCluster := ""
// get local member of serf
self := s.agent.LocalMember()
- peerUrl, err := url.Parse(proto + "://" + self.Addr.String() + ":" + strconv.Itoa(s.conf.ClusterPort))
+ _, peerPort, _ := utils.SplitAddress(s.conf.Listener.PeerAddr)
+ peerUrl, err := url.Parse(proto + "://" + self.Addr.String() + ":" + strconv.Itoa(peerPort))
if err != nil {
log.Error("parse url from serf local member failed", err)
return err
}
// group members from serf as initial cluster members
- for _, member := range s.agent.GroupMembers(s.conf.ClusterName) {
+ for _, member := range s.agent.GroupMembers(s.conf.Cluster) {
initialCluster += member.Name + "=" + proto + "://" + member.Addr.String() + ":" + member.Tags[serf.TagKeyClusterPort] + ","
}
@@ -228,8 +249,8 @@ func (s *Server) configureCluster() error {
log.Error("etcd peer not found", err)
return err
}
- s.conf.Etcd.APUrls = []url.URL{*peerUrl}
- s.conf.Etcd.LPUrls = []url.URL{*peerUrl}
- s.conf.Etcd.InitialCluster = initialCluster[:len(initialCluster)-1]
+ s.etcdConf.APUrls = []url.URL{*peerUrl}
+ s.etcdConf.LPUrls = []url.URL{*peerUrl}
+ s.etcdConf.InitialCluster = initialCluster[:len(initialCluster)-1]
return nil
}
diff --git a/syncer/servicecenter/servicecenter_test.go b/syncer/servicecenter/servicecenter_test.go
index 5b8c4a5..641b93c 100644
--- a/syncer/servicecenter/servicecenter_test.go
+++ b/syncer/servicecenter/servicecenter_test.go
@@ -52,7 +52,7 @@ func TestNewServicecenter(t *testing.T) {
func TestOnEvent(t *testing.T) {
conf := config.DefaultConfig()
- conf.SC.Plugin = mockplugin.PluginName
+ conf.Registry.Plugin = mockplugin.PluginName
initPlugin(conf)
dc, err := servicecenter.NewServicecenter(
plugins.WithEndpoints([]string{"127.0.0.1:30100"}))
@@ -120,5 +120,5 @@ func TestOnEvent(t *testing.T) {
}
func initPlugin(conf *config.Config) {
- plugins.SetPluginConfig(plugins.PluginServicecenter.String(), conf.SC.Plugin)
+ plugins.SetPluginConfig(plugins.PluginServicecenter.String(), conf.Registry.Plugin)
}