You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ma...@apache.org on 2019/08/27 03:55:30 UTC
[servicecomb-service-center] branch master updated: Support for tls
certificates when using GRPC for data synchronization
This is an automated email from the ASF dual-hosted git repository.
mabin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new eaad9ee Support for tls certificates when using GRPC for data synchronization
new 5840fb6 Merge pull request #572 from ChinX/syncer
eaad9ee is described below
commit eaad9ee687a1a3f25df21a8a07420c35d1955c3d
Author: chinx <c5...@126.com>
AuthorDate: Thu Aug 15 18:34:38 2019 +0800
Support for tls certificates when using GRPC for data synchronization
---
go.mod | 2 +-
syncer/cmd/daemon.go | 19 ++++-
syncer/config/config.go | 54 ++++++++++--
syncer/config/config_test.go | 10 +--
syncer/config/tlsconfig.go | 191 +++++++++++++++++++++++++++++++++++++++++++
syncer/grpc/client.go | 20 ++++-
syncer/grpc/server.go | 14 +++-
syncer/serf/config.go | 11 ++-
syncer/server/handler.go | 20 ++++-
syncer/server/server.go | 7 +-
10 files changed, 319 insertions(+), 29 deletions(-)
diff --git a/go.mod b/go.mod
index bd8dc7b..4faac22 100644
--- a/go.mod
+++ b/go.mod
@@ -107,7 +107,7 @@ require (
gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
- gopkg.in/yaml.v2 v2.2.1 // indirect
+ gopkg.in/yaml.v2 v2.2.1
k8s.io/api v0.0.0-20180601181742-8b7507fac302
k8s.io/apimachinery v0.0.0-20180601181227-17529ec7eadb
k8s.io/client-go v2.0.0-alpha.0.0.20180817174322-745ca8300397+incompatible
diff --git a/syncer/cmd/daemon.go b/syncer/cmd/daemon.go
index 70b65da..898d6af 100644
--- a/syncer/cmd/daemon.go
+++ b/syncer/cmd/daemon.go
@@ -26,7 +26,8 @@ import (
)
var (
- conf = config.DefaultConfig()
+ conf = config.DefaultConfig()
+ configFile = ""
)
var syncerCmd = &cobra.Command{
@@ -64,12 +65,24 @@ func init() {
syncerCmd.Flags().StringVar(&conf.ServicecenterPlugin, "sc-plugin", conf.ServicecenterPlugin,
"plugin name of servicecenter")
+
+ syncerCmd.Flags().StringVar(&configFile, "config", "",
+ "configuration from file")
}
// runSyncer Runs the Syncer service.
func runSyncer(cmd *cobra.Command, args []string) {
- if err := conf.Verification(); err != nil {
- log.Errorf(err, "verification syncer config failed")
+ if configFile != "" {
+ fromFile, err := config.LoadConfig(configFile)
+ if err != nil {
+ log.Errorf(err, "load config file failed")
+ return
+ }
+ conf.Merge(fromFile)
+ }
+
+ if err := conf.Verify(); err != nil {
+ log.Errorf(err, "verify syncer config failed")
return
}
diff --git a/syncer/config/config.go b/syncer/config/config.go
index 723eab8..cc86dc8 100644
--- a/syncer/config/config.go
+++ b/syncer/config/config.go
@@ -19,6 +19,7 @@ package config
import (
"crypto/md5"
"fmt"
+ "io/ioutil"
"os"
"strings"
@@ -28,12 +29,14 @@ import (
_ "github.com/apache/servicecomb-service-center/syncer/plugins/eureka"
"github.com/apache/servicecomb-service-center/syncer/plugins/servicecenter"
"github.com/apache/servicecomb-service-center/syncer/serf"
+ "gopkg.in/yaml.v2"
)
var (
DefaultDCPort = 30100
DefaultClusterPort = 30192
DefaultTickerInterval = 30
+ DefaultConfigPath = "./conf/config.yaml"
)
// Config is the configuration that can be set for Syncer. Some of these
@@ -51,12 +54,13 @@ type Config struct {
SCAddr string `yaml:"dc_addr"`
// JoinAddr The management address of one gossip pool member.
- JoinAddr string `yaml:"join_addr"`
- TickerInterval int `yaml:"ticker_interval"`
- Profile string `yaml:"profile"`
- EnableCompression bool `yaml:"enable_compression"`
- AutoSync bool `yaml:"auto_sync"`
- ServicecenterPlugin string `yaml:"servicecenter_plugin"`
+ JoinAddr string `yaml:"join_addr"`
+ TickerInterval int `yaml:"ticker_interval"`
+ Profile string `yaml:"profile"`
+ EnableCompression bool `yaml:"enable_compression"`
+ AutoSync bool `yaml:"auto_sync"`
+ TLSConfig *TLSConfig `yaml:"tls_config"`
+ ServicecenterPlugin string `yaml:"servicecenter_plugin"`
}
// DefaultConfig returns the default config
@@ -75,12 +79,44 @@ func DefaultConfig() *Config {
TickerInterval: DefaultTickerInterval,
Config: serfConf,
Etcd: etcdConf,
+ TLSConfig: DefaultTLSConfig(),
ServicecenterPlugin: servicecenter.PluginName,
}
}
-// Verification Provide config verification
-func (c *Config) Verification() error {
+// LoadConfig loads configuration from file
+func LoadConfig(filepath string) (*Config, error) {
+ if filepath == "" {
+ filepath = DefaultConfigPath
+ }
+ if !(utils.IsFileExist(filepath)) {
+ err := fmt.Errorf("file is not exist")
+ log.Errorf(err, "Load config from %s failed", filepath)
+ return nil, err
+ }
+
+ byteArr, err := ioutil.ReadFile(filepath)
+ if err != nil {
+ log.Errorf(err, "Load config from %s failed", filepath)
+ return nil, err
+ }
+
+ conf := &Config{}
+ err = yaml.Unmarshal(byteArr, conf)
+ if err != nil {
+ log.Errorf(err, "Unmarshal config file failed, content is %s", byteArr)
+ return nil, err
+ }
+ return conf, nil
+}
+
+// Merge other configuration into the current configuration
+func (c *Config) Merge(other *Config) {
+ c.TLSConfig.Merge("", other.TLSConfig)
+}
+
+// Verify Provide config verification
+func (c *Config) Verify() error {
ip, port, err := utils.SplitHostPort(c.BindAddr, serf.DefaultBindPort)
if err != nil {
return err
@@ -106,6 +142,8 @@ func (c *Config) Verification() error {
c.ClusterName = fmt.Sprintf("%x", md5.Sum([]byte(c.SCAddr)))
}
+ c.TLSEnabled = c.TLSConfig.Enabled
+
c.Etcd.SetName(c.NodeName)
return nil
}
diff --git a/syncer/config/config_test.go b/syncer/config/config_test.go
index 629c102..4afb608 100644
--- a/syncer/config/config_test.go
+++ b/syncer/config/config_test.go
@@ -20,29 +20,29 @@ import "testing"
func TestDefaultVerification(t *testing.T) {
conf := DefaultConfig()
- conf.Verification()
+ conf.Verify()
}
func TestBindAddrVerification(t *testing.T) {
conf := DefaultConfig()
conf.BindAddr = "abcde"
- conf.Verification()
+ conf.Verify()
}
func TestRPCAddrAddrVerification(t *testing.T) {
conf := DefaultConfig()
conf.RPCAddr = "abcde"
- conf.Verification()
+ conf.Verify()
}
func TestLocalBindAddrVerification(t *testing.T) {
conf := DefaultConfig()
conf.BindAddr = "127.0.0.1"
- conf.Verification()
+ conf.Verify()
}
func TestLocalRPCAddrVerification(t *testing.T) {
conf := DefaultConfig()
conf.RPCAddr = "127.0.0.1"
- conf.Verification()
+ conf.Verify()
}
diff --git a/syncer/config/tlsconfig.go b/syncer/config/tlsconfig.go
new file mode 100644
index 0000000..a2e7445
--- /dev/null
+++ b/syncer/config/tlsconfig.go
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "crypto/tls"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/tlsutil"
+ "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+)
+
+const (
+ dirName = "certs"
+ caCert = "trust.cer"
+ serverCert = "server.cer"
+ serverKey = "server_key.pem"
+)
+
+// TLSConfig tls configuration
+type TLSConfig struct {
+ Enabled bool `yaml:"enabled"`
+ VerifyPeer bool `yaml:"verify_peer"`
+ MinVersion string `yaml:"min_version"`
+ Passphrase string `yaml:"passphrase"`
+ CAFile string `yaml:"ca_file"`
+ CertFile string `yaml:"cert_file"`
+ KeyFile string `yaml:"key_file"`
+ Ciphers []string `yaml:"ciphers"`
+ clientTlsConfig *tls.Config `yaml:"-"`
+ serverTlsConfig *tls.Config `yaml:"-"`
+ mux sync.Mutex `yaml:"-"`
+}
+
+// DefaultTLSConfig returns default tls configuration
+func DefaultTLSConfig() *TLSConfig {
+ return &TLSConfig{
+ Enabled: false,
+ }
+}
+
+// NewTLSConfig returns tls configuration by server
+func NewTLSConfig(server string) *TLSConfig {
+ return &TLSConfig{
+ Enabled: true,
+ VerifyPeer: true,
+ CAFile: pathFromSSLEnvOrDefault(server, caCert),
+ CertFile: pathFromSSLEnvOrDefault(server, serverCert),
+ KeyFile: pathFromSSLEnvOrDefault(server, serverKey),
+ }
+}
+
+// Merge other tls configuration into the current configuration
+func (t *TLSConfig) Merge(server string, other *TLSConfig) {
+ if !other.Enabled {
+ return
+ }
+ t.Enabled = other.Enabled
+ t.VerifyPeer = other.VerifyPeer
+ t.MinVersion = other.MinVersion
+
+ if other.CAFile == "" && t.CAFile == "" {
+ other.CAFile = pathFromSSLEnvOrDefault(server, caCert)
+ }
+ t.CAFile = other.CAFile
+
+ if other.CertFile == "" && t.CertFile == "" {
+ other.CertFile = pathFromSSLEnvOrDefault(server, serverCert)
+ }
+ t.CertFile = other.CertFile
+
+ if other.KeyFile == "" && t.KeyFile == "" {
+ other.KeyFile = pathFromSSLEnvOrDefault(server, serverKey)
+ }
+ t.KeyFile = other.KeyFile
+}
+
+// Verify the tls configuration
+func (t *TLSConfig) Verify() (err error) {
+ if !t.Enabled {
+ return
+ }
+
+ if t.CAFile == "" || !utils.IsFileExist(t.CAFile) {
+ err = fmt.Errorf("tls ca file '%s' is not found", t.CAFile)
+ }
+
+ if err == nil && t.CertFile == "" || !utils.IsFileExist(t.CertFile) {
+ err = fmt.Errorf("tls cert file '%s' is not found", t.CertFile)
+ }
+
+ if err == nil && t.KeyFile == "" || !utils.IsFileExist(t.KeyFile) {
+ err = fmt.Errorf("tls key file '%s' is not found", t.KeyFile)
+ }
+
+ if err == nil {
+ for _, cipher := range t.Ciphers {
+ if _, ok := tlsutil.TLS_CIPHER_SUITE_MAP[cipher]; !ok {
+ err = fmt.Errorf("cipher %s not exist", cipher)
+ break
+ }
+ }
+ }
+
+ if err != nil {
+ log.Error("verify tls configuration failed", err)
+ }
+ return
+}
+
+// ClientTlsConfig get the tls.config of client
+func (t *TLSConfig) ClientTlsConfig() (*tls.Config, error) {
+ if !t.Enabled {
+ return nil, nil
+ }
+
+ t.mux.Lock()
+ defer t.mux.Unlock()
+ if t.clientTlsConfig != nil {
+ return t.clientTlsConfig, nil
+ }
+
+ opts := append(tlsutil.DefaultClientTLSOptions(), t.toOptions()...)
+ conf, err := tlsutil.GetClientTLSConfig(opts...)
+ if err != nil {
+ log.Error("get client tls config failed", err)
+ }
+ t.clientTlsConfig = conf
+ return conf, err
+}
+
+// ServerTlsConfig get the tls.config of server
+func (t *TLSConfig) ServerTlsConfig() (*tls.Config, error) {
+ if !t.Enabled {
+ return nil, nil
+ }
+
+ if t.serverTlsConfig != nil {
+ return t.serverTlsConfig, nil
+ }
+
+ opts := append(tlsutil.DefaultClientTLSOptions(), t.toOptions()...)
+ conf, err := tlsutil.GetClientTLSConfig(opts...)
+ if err != nil {
+ log.Error("get server tls config failed", err)
+ }
+ t.serverTlsConfig = conf
+ return conf, err
+}
+
+func (t *TLSConfig) toOptions() []tlsutil.SSLConfigOption {
+ return []tlsutil.SSLConfigOption{
+ tlsutil.WithVerifyPeer(t.VerifyPeer),
+ tlsutil.WithVersion(tlsutil.ParseSSLProtocol(t.MinVersion), tls.VersionTLS12),
+ tlsutil.WithCipherSuits(
+ tlsutil.ParseDefaultSSLCipherSuites(strings.Join(t.Ciphers, ","))),
+ tlsutil.WithKeyPass(t.Passphrase),
+ tlsutil.WithCA(t.CAFile),
+ tlsutil.WithCert(t.CertFile),
+ tlsutil.WithKey(t.KeyFile),
+ }
+}
+
+func pathFromSSLEnvOrDefault(server, path string) string {
+ env := os.Getenv("SSL_ROOT")
+ if len(env) == 0 {
+ wd, _ := os.Getwd()
+ return filepath.Join(wd, dirName, server, path)
+ }
+ return os.ExpandEnv(filepath.Join("$SSL_ROOT", server, path))
+}
diff --git a/syncer/grpc/client.go b/syncer/grpc/client.go
index f57ab5c..19f49ba 100644
--- a/syncer/grpc/client.go
+++ b/syncer/grpc/client.go
@@ -18,11 +18,13 @@ package grpc
import (
"context"
+ "crypto/tls"
"sync"
"github.com/apache/servicecomb-service-center/pkg/log"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
"google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
)
var (
@@ -37,8 +39,8 @@ type Client struct {
cli pb.SyncClient
}
-func Pull(ctx context.Context, addr string) (*pb.SyncData, error) {
- cli := getClient(addr)
+func Pull(ctx context.Context, addr string, tlsConf *tls.Config) (*pb.SyncData, error) {
+ cli := getClient(addr, tlsConf)
data, err := cli.cli.Pull(ctx, &pb.PullRequest{})
if err != nil {
@@ -63,14 +65,24 @@ func closeClient(addr string) {
}
// GetClient Get the client from the client caches with addr
-func getClient(addr string) *Client {
+func getClient(addr string, tlsConf *tls.Config) *Client {
lock.RLock()
cli, ok := clients[addr]
lock.RUnlock()
if !ok {
+ var conn *grpc.ClientConn
+ var err error
+
log.Infof("Create new grpc connection to %s", addr)
- conn, err := grpc.Dial(addr, grpc.WithInsecure())
+
+ if tlsConf != nil {
+ conn, err = grpc.Dial(addr, grpc.WithTransportCredentials(credentials.NewTLS(tlsConf)))
+ } else {
+ conn, err = grpc.Dial(addr, grpc.WithInsecure())
+ }
+
if err != nil {
+ log.Error("create grpc client conn failed", err)
return nil
}
cli = &Client{conn: conn, cli: pb.NewSyncClient(conn), addr: addr}
diff --git a/syncer/grpc/server.go b/syncer/grpc/server.go
index 086b657..53007c5 100644
--- a/syncer/grpc/server.go
+++ b/syncer/grpc/server.go
@@ -19,12 +19,14 @@ package grpc
import (
"context"
+ "crypto/tls"
"net"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
"google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
)
type GRPCHandler interface {
@@ -39,15 +41,17 @@ type Server struct {
handler GRPCHandler
readyCh chan struct{}
errorCh chan error
+ tlsConf *tls.Config
}
// NewServer new grpc server
-func NewServer(addr string, handler GRPCHandler) *Server {
+func NewServer(addr string, handler GRPCHandler, tlsConf *tls.Config) *Server {
return &Server{
addr: addr,
handler: handler,
readyCh: make(chan struct{}),
errorCh: make(chan error),
+ tlsConf: tlsConf,
}
}
@@ -68,7 +72,13 @@ func (s *Server) Stop() {
func (s *Server) Start(ctx context.Context) {
lsn, err := net.Listen("tcp", s.addr)
if err == nil {
- svc := grpc.NewServer()
+ var svc *grpc.Server
+ if s.tlsConf != nil {
+ svc = grpc.NewServer(grpc.Creds(credentials.NewTLS(s.tlsConf)))
+ }else{
+ svc = grpc.NewServer()
+ }
+
pb.RegisterSyncServer(svc, s)
s.lsn = lsn
gopool.Go(func(ctx context.Context) {
diff --git a/syncer/serf/config.go b/syncer/serf/config.go
index 8a25908..e954862 100644
--- a/syncer/serf/config.go
+++ b/syncer/serf/config.go
@@ -37,6 +37,7 @@ const (
tagKeyClusterName = "syncer-cluster-name"
TagKeyClusterPort = "syncer-cluster-port"
TagKeyRPCPort = "syncer-rpc-port"
+ TagKeyTLSEnabled = "syncer-tls-enabled"
)
// DefaultConfig default config
@@ -61,8 +62,9 @@ type Config struct {
ClusterName string `json:"cluster_name"`
// port to communicate between cluster members
- ClusterPort int `yaml:"cluster_port"`
- RPCPort int `yaml:"-"`
+ ClusterPort int `yaml:"cluster_port"`
+ RPCPort int `yaml:"-"`
+ TLSEnabled bool `json:"-"`
}
// readConfigFile reads configuration from config file
@@ -96,7 +98,10 @@ func (c *Config) convertToSerf() (*serf.Config, error) {
serfConf.MemberlistConfig.BindAddr = bindIP
serfConf.MemberlistConfig.BindPort = bindPort
serfConf.NodeName = c.NodeName
- serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)}
+ serfConf.Tags = map[string]string{
+ TagKeyRPCPort: strconv.Itoa(c.RPCPort),
+ TagKeyTLSEnabled: strconv.FormatBool(c.TLSEnabled),
+ }
if c.ClusterName != "" {
serfConf.Tags[tagKeyClusterName] = c.ClusterName
diff --git a/syncer/server/handler.go b/syncer/server/handler.go
index 8760a83..a913331 100644
--- a/syncer/server/handler.go
+++ b/syncer/server/handler.go
@@ -18,14 +18,16 @@ package server
import (
"context"
+ "crypto/tls"
"fmt"
+ "strconv"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/syncer/grpc"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
- "github.com/hashicorp/serf/serf"
myserf "github.com/apache/servicecomb-service-center/syncer/serf"
+ "github.com/hashicorp/serf/serf"
)
const (
@@ -93,7 +95,21 @@ func (s *Server) userEvent(event serf.UserEvent) {
// Get dta from remote member
endpoint := fmt.Sprintf("%s:%s", members[0].Addr, members[0].Tags[myserf.TagKeyRPCPort])
log.Debugf("Going to pull data from %s %s", members[0].Name, endpoint)
- data, err := grpc.Pull(context.Background(), endpoint)
+
+ enabled, err := strconv.ParseBool(members[0].Tags[myserf.TagKeyTLSEnabled])
+ if err != nil {
+ log.Warnf("get tls enabled failed, err = %s", err)
+ }
+ var tlsConfig *tls.Config
+ if enabled {
+ tlsConfig, err = s.conf.TLSConfig.ServerTlsConfig()
+ if err != nil {
+ log.Error("get grpc client tls config failed", err)
+ return
+ }
+ }
+
+ data, err := grpc.Pull(context.Background(), endpoint, tlsConfig)
if err != nil {
log.Errorf(err, "Pull other serf instances failed, node name is '%s'", members[0].Name)
return
diff --git a/syncer/server/server.go b/syncer/server/server.go
index afed8d0..58e62c8 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -190,7 +190,12 @@ func (s *Server) initialization() (err error) {
return
}
- s.grpc = grpc.NewServer(s.conf.RPCAddr, s)
+ tlsConfig, err := s.conf.TLSConfig.ServerTlsConfig()
+ if err != nil {
+ log.Error("get grpc server tls config failed", err)
+ return
+ }
+ s.grpc = grpc.NewServer(s.conf.RPCAddr, s, tlsConfig)
return nil
}