You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/04/13 03:59:58 UTC
[incubator-servicecomb-service-center] branch master updated:
SCB-472 Null point reference in zipkin plugin (#325)
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 6c4143c SCB-472 Null point reference in zipkin plugin (#325)
6c4143c is described below
commit 6c4143ca52917cbdf3933b59864b91aedbb14e2d
Author: little-cui <su...@qq.com>
AuthorDate: Fri Apr 13 11:59:56 2018 +0800
SCB-472 Null point reference in zipkin plugin (#325)
* SCB-472 Null point reference in zipkin plugin
* SCB-472 add timeout
* SCB-472 remove type convert
* SCB-472 catch goroutine panic
* SCB-472 catch goroutine panic
* SCB-472 Fix the UT failures
* SCB-472 Fix the UT failures
---
etc/conf/app.conf | 12 +--
main.go | 12 ++-
pkg/async/async_task.go | 29 ++++--
pkg/chain/callback.go | 6 +-
pkg/etcdsync/mutex.go | 8 +-
pkg/grace/grace.go | 29 +++---
pkg/util/goroutines.go | 48 +++++-----
pkg/util/goroutines_test.go | 95 +++++++++----------
pkg/util/log.go | 13 +--
pkg/util/logrotate.go | 5 +-
server/api.go | 22 +++--
server/broker/service.go | 2 +-
server/broker/store.go | 32 ++++++-
server/broker/util.go | 22 +++--
server/core/0_init.go | 8 +-
server/core/backend/store/cacher.go | 35 +++----
server/core/backend/store/defer.go | 7 +-
server/core/backend/store/indexer.go | 6 +-
server/core/backend/store/listwatch.go | 10 +-
server/core/backend/store/store.go | 16 +++-
server/infra/registry/registry.go | 4 +-
.../infra/registry/embededetcd/embededetcd.go | 84 +++++++++--------
.../plugin/infra/tracing/buildin/file_collector.go | 101 ++++++++++++---------
.../infra/tracing/buildin/file_collector_test.go | 21 ++---
server/plugin/infra/tracing/buildin/span.go | 4 +-
server/plugin/infra/tracing/buildin/span_test.go | 8 ++
server/server.go | 14 +--
server/service/event/dependency_event_handler.go | 4 +-
server/service/instances.go | 35 +++----
server/service/microservices.go | 53 ++++++-----
server/service/notification/listwatcher.go | 9 +-
.../service/notification/notification_service.go | 86 +++++++++++-------
server/service/notification/watch_util.go | 39 ++++----
server/service/util/dependency.go | 10 +-
34 files changed, 491 insertions(+), 398 deletions(-)
diff --git a/etc/conf/app.conf b/etc/conf/app.conf
index 02786d9..d468dbe 100644
--- a/etc/conf/app.conf
+++ b/etc/conf/app.conf
@@ -109,6 +109,12 @@ log_format = text
log_sys = false
###################################################################
+# Frontend Configurations
+###################################################################
+frontend_host_ip=127.0.0.1
+frontend_host_port=30103
+
+###################################################################
# above is the global configurations
# you can overide above configuration in specific env
###################################################################
@@ -120,9 +126,3 @@ logfile = ./service-center.log
[dev]
loglevel = DEBUG
logfile = ""
-
-###################################################################
-# Frontend Configurations
-###################################################################
-frontend_host_ip=127.0.0.1
-frontend_host_port=30103
diff --git a/main.go b/main.go
index d76e34c..3d7ac97 100644
--- a/main.go
+++ b/main.go
@@ -18,8 +18,18 @@ package main
// plugins
import _ "github.com/apache/incubator-servicecomb-service-center/server/bootstrap"
-import "github.com/apache/incubator-servicecomb-service-center/server"
+import (
+ "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+ "github.com/apache/incubator-servicecomb-service-center/server"
+ "github.com/apache/incubator-servicecomb-service-center/server/core/backend"
+)
func main() {
server.Run()
+
+ util.GoCloseAndWait()
+
+ backend.Registry().Close()
+
+ util.Logger().Warn("service center exited", nil)
}
diff --git a/pkg/async/async_task.go b/pkg/async/async_task.go
index 14727a1..73ceb38 100644
--- a/pkg/async/async_task.go
+++ b/pkg/async/async_task.go
@@ -39,6 +39,7 @@ type scheduler struct {
queue *util.UniQueue
latestTask AsyncTask
once sync.Once
+ goroutine *util.GoRoutine
}
func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) {
@@ -47,7 +48,7 @@ func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) {
}
s.once.Do(func() {
- go s.do()
+ s.goroutine.Do(s.do)
})
err = s.queue.Put(ctx, task)
@@ -57,15 +58,17 @@ func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) {
return s.latestTask.Err()
}
-func (s *scheduler) do() {
+func (s *scheduler) do(ctx context.Context) {
for {
select {
+ case <-ctx.Done():
+ return
case task, ok := <-s.queue.Chan():
if !ok {
return
}
at := task.(AsyncTask)
- at.Do(context.Background())
+ at.Do(ctx)
s.latestTask = at
}
}
@@ -73,6 +76,15 @@ func (s *scheduler) do() {
func (s *scheduler) Close() {
s.queue.Close()
+ s.goroutine.Close(true)
+}
+
+func newScheduler(task AsyncTask) *scheduler {
+ return &scheduler{
+ queue: util.NewUniQueue(),
+ latestTask: task,
+ goroutine: util.NewGo(context.Background()),
+ }
}
type AsyncTaskService struct {
@@ -99,10 +111,7 @@ func (lat *AsyncTaskService) getOrNewScheduler(task AsyncTask) (s *scheduler, is
s, ok = lat.schedules[key]
if !ok {
isNew = true
- s = &scheduler{
- queue: util.NewUniQueue(),
- latestTask: task,
- }
+ s = newScheduler(task)
lat.schedules[key] = s
}
lat.lock.Unlock()
@@ -166,11 +175,11 @@ func (lat *AsyncTaskService) LatestHandled(key string) (AsyncTask, error) {
return s.latestTask, nil
}
-func (lat *AsyncTaskService) daemon(stopCh <-chan struct{}) {
+func (lat *AsyncTaskService) daemon(ctx context.Context) {
util.SafeCloseChan(lat.ready)
for {
select {
- case <-stopCh:
+ case <-ctx.Done():
util.Logger().Debugf("daemon thread exited for AsyncTaskService is stopped")
return
case <-time.After(DEFAULT_REMOVE_TASKS_INTERVAL):
@@ -228,7 +237,7 @@ func NewAsyncTaskService() *AsyncTaskService {
return &AsyncTaskService{
schedules: make(map[string]*scheduler, DEFAULT_MAX_SCHEDULE_COUNT),
removeTasks: make(map[string]struct{}, DEFAULT_MAX_SCHEDULE_COUNT),
- goroutine: util.NewGo(make(chan struct{})),
+ goroutine: util.NewGo(context.Background()),
ready: make(chan struct{}),
isClose: true,
}
diff --git a/pkg/chain/callback.go b/pkg/chain/callback.go
index 9b2e7fa..3657740 100644
--- a/pkg/chain/callback.go
+++ b/pkg/chain/callback.go
@@ -47,11 +47,7 @@ func (cb *Callback) Invoke(r Result) {
}
func syncInvoke(f func(r Result), r Result) {
- defer func() {
- if itf := recover(); itf != nil {
- util.LogPanic(itf)
- }
- }()
+ defer util.RecoverAndReport()
if f == nil {
util.Logger().Errorf(nil, "Callback function is nil. result: %s,", r)
return
diff --git a/pkg/etcdsync/mutex.go b/pkg/etcdsync/mutex.go
index f54ef6f..d1a35b9 100644
--- a/pkg/etcdsync/mutex.go
+++ b/pkg/etcdsync/mutex.go
@@ -134,7 +134,8 @@ func (m *DLock) Lock(wait bool) error {
util.Logger().Warnf(err, "Key %s is locked, waiting for other node releases it, id=%s", m.builder.key, m.id)
ctx, cancel := context.WithTimeout(m.builder.ctx, DEFAULT_LOCK_TTL*time.Second)
- go func() {
+ util.Go(func(context.Context) {
+ defer cancel()
err := backend.Registry().Watch(ctx,
registry.WithStrKey(m.builder.key),
registry.WithWatchCallback(
@@ -146,10 +147,9 @@ func (m *DLock) Lock(wait bool) error {
return nil
}))
if err != nil {
- util.Logger().Errorf(nil, "%s, key=%s, id=%s", err.Error(), m.builder.key, m.id)
+ util.Logger().Warnf(nil, "%s, key=%s, id=%s", err.Error(), m.builder.key, m.id)
}
- cancel()
- }()
+ })
select {
case <-ctx.Done():
continue // 可以重新尝试获取锁
diff --git a/pkg/grace/grace.go b/pkg/grace/grace.go
index fef6caf..3ec7cd3 100644
--- a/pkg/grace/grace.go
+++ b/pkg/grace/grace.go
@@ -20,6 +20,7 @@ import (
"flag"
"fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+ "golang.org/x/net/context"
"os"
"os/exec"
"os/signal"
@@ -71,7 +72,7 @@ func Init() {
flag.Parse()
}
- go handleSignals()
+ util.Go(handleSignals)
}
func Before(f func()) {
@@ -111,26 +112,28 @@ func fireSignalHook(ppFlag int, sig os.Signal) {
}
}
-func handleSignals() {
+func handleSignals(ctx context.Context) {
var sig os.Signal
sigCh := make(chan os.Signal)
signal.Notify(sigCh, registerSignals...)
for {
- sig = <-sigCh
- fireSignalHook(PreSignal, sig)
- switch sig {
- case syscall.SIGHUP:
- util.Logger().Debugf("received signal 'SIGHUP', now forking")
- err := fork()
- if err != nil {
- util.Logger().Errorf(err, "fork a process failed")
+ select {
+ case <-ctx.Done():
+ return
+ case sig = <-sigCh:
+ fireSignalHook(PreSignal, sig)
+ switch sig {
+ case syscall.SIGHUP:
+ util.Logger().Debugf("received signal '%v', now forking", sig)
+ err := fork()
+ if err != nil {
+ util.Logger().Errorf(err, "fork a process failed")
+ }
}
- default:
- util.Logger().Warnf(nil, "received signal '%v'", sig)
+ fireSignalHook(PostSignal, sig)
}
- fireSignalHook(PostSignal, sig)
}
}
diff --git a/pkg/util/goroutines.go b/pkg/util/goroutines.go
index a021f52..bb9b9d9 100644
--- a/pkg/util/goroutines.go
+++ b/pkg/util/goroutines.go
@@ -16,42 +16,37 @@
*/
package util
-import "sync"
+import (
+ "golang.org/x/net/context"
+ "sync"
+)
type GoRoutine struct {
- stopCh chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
wg sync.WaitGroup
mux sync.RWMutex
- once sync.Once
closed bool
}
-func (g *GoRoutine) Init(stopCh chan struct{}) {
- g.once.Do(func() {
- g.stopCh = stopCh
- })
-}
-
-func (g *GoRoutine) StopCh() <-chan struct{} {
- return g.stopCh
-}
-
-func (g *GoRoutine) Do(f func(<-chan struct{})) {
+func (g *GoRoutine) Do(f func(context.Context)) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
- f(g.StopCh())
+ defer RecoverAndReport()
+ f(g.ctx)
}()
}
func (g *GoRoutine) Close(wait bool) {
g.mux.Lock()
defer g.mux.Unlock()
+
if g.closed {
return
}
g.closed = true
- close(g.stopCh)
+ g.cancel()
if wait {
g.Wait()
}
@@ -61,27 +56,26 @@ func (g *GoRoutine) Wait() {
g.wg.Wait()
}
-var defaultGo GoRoutine
+var defaultGo *GoRoutine
func init() {
- GoInit()
+ defaultGo = NewGo(context.Background())
}
-func Go(f func(<-chan struct{})) {
+func Go(f func(context.Context)) {
defaultGo.Do(f)
}
-func GoInit() {
- defaultGo.Init(make(chan struct{}))
-}
-
func GoCloseAndWait() {
defaultGo.Close(true)
- Logger().Debugf("all goroutines quit normally")
+ Logger().Debugf("all goroutines exited")
}
-func NewGo(stopCh chan struct{}) *GoRoutine {
- gr := &GoRoutine{}
- gr.Init(stopCh)
+func NewGo(ctx context.Context) *GoRoutine {
+ ctx, cancel := context.WithCancel(ctx)
+ gr := &GoRoutine{
+ ctx: ctx,
+ cancel: cancel,
+ }
return gr
}
diff --git a/pkg/util/goroutines_test.go b/pkg/util/goroutines_test.go
index cfc0919..d808814 100644
--- a/pkg/util/goroutines_test.go
+++ b/pkg/util/goroutines_test.go
@@ -18,72 +18,66 @@ package util
import (
"fmt"
+ "golang.org/x/net/context"
"sync"
"testing"
"time"
)
-func TestGoRoutine_Init(t *testing.T) {
- var test GoRoutine
- stopCh1 := make(chan struct{})
- defer close(stopCh1)
- stopCh2 := make(chan struct{})
- defer close(stopCh2)
-
- test.Init(stopCh1)
- c := test.StopCh()
- if c != stopCh1 {
- fail(t, "init GoRoutine failed.")
- }
-
- test.Init(stopCh2)
- c = test.StopCh()
- if c == stopCh2 {
- fail(t, "init GoRoutine twice.")
- }
-}
-
func TestGoRoutine_Do(t *testing.T) {
- var test1 GoRoutine
- stopCh := make(chan struct{})
- test1.Init(make(chan struct{}))
- test1.Do(func(neverStopCh <-chan struct{}) {
- defer close(stopCh)
+ test1 := NewGo(context.Background())
+ defer test1.Close(true)
+ stopCh1 := make(chan struct{})
+ test1.Do(func(ctx context.Context) {
+ defer close(stopCh1)
select {
- case <-neverStopCh:
- fail(t, "neverStopCh should not be closed.")
+ case <-ctx.Done():
+ fail(t, "ctx should not be done.")
case <-time.After(time.Second):
}
})
- <-stopCh
+ <-stopCh1
- var test2 GoRoutine
- stopCh1 := make(chan struct{})
+ ctx, cancel := context.WithCancel(context.Background())
+ test2 := NewGo(ctx)
+ defer test2.Close(true)
stopCh2 := make(chan struct{})
- test2.Init(stopCh1)
- test2.Do(func(stopCh <-chan struct{}) {
+ test2.Do(func(ctx context.Context) {
defer close(stopCh2)
select {
- case <-stopCh:
+ case <-ctx.Done():
case <-time.After(time.Second):
- fail(t, "time out to wait stopCh1 close.")
+ fail(t, "time out to wait stopCh2 close.")
}
})
- close(stopCh1)
+ cancel()
<-stopCh2
+
+ ctx, _ = context.WithTimeout(context.Background(), 0)
+ test3 := NewGo(ctx)
+ defer test3.Close(true)
+ stopCh3 := make(chan struct{})
+ test3.Do(func(ctx context.Context) {
+ defer close(stopCh3)
+ select {
+ case <-ctx.Done():
+ case <-time.After(time.Second):
+ fail(t, "time out to wait ctx done.")
+ }
+ })
+ <-stopCh3
}
func TestGoRoutine_Wait(t *testing.T) {
- var test GoRoutine
var mux sync.Mutex
MAX := 10
resultArr := make([]int, 0, MAX)
- test.Init(make(chan struct{}))
+ test := NewGo(context.Background())
for i := 0; i < MAX; i++ {
func(i int) {
- test.Do(func(neverStopCh <-chan struct{}) {
+ test.Do(func(ctx context.Context) {
select {
- case <-neverStopCh:
+ case <-ctx.Done():
case <-time.After(time.Second):
mux.Lock()
resultArr = append(resultArr, i)
@@ -103,13 +97,12 @@ func TestGoRoutine_Wait(t *testing.T) {
}
func TestGoRoutine_Close(t *testing.T) {
- var test GoRoutine
- test.Init(make(chan struct{}))
- test.Do(func(stopCh <-chan struct{}) {
+ test := NewGo(context.Background())
+ test.Do(func(ctx context.Context) {
select {
- case <-stopCh:
+ case <-ctx.Done():
case <-time.After(time.Second):
- fail(t, "time out to wait stopCh close.")
+ fail(t, "time out to wait ctx close.")
}
})
test.Close(true)
@@ -117,20 +110,18 @@ func TestGoRoutine_Close(t *testing.T) {
}
func TestGo(t *testing.T) {
- GoInit()
- Go(func(stopCh <-chan struct{}) {
+ Go(func(ctx context.Context) {
for {
select {
- case <-stopCh:
+ case <-ctx.Done():
return
case <-time.After(time.Second):
}
}
})
+ Go(func(ctx context.Context) {
+ var a *int
+ fmt.Println(*a)
+ })
GoCloseAndWait()
}
-
-func TestNewGo(t *testing.T) {
- g := NewGo(make(chan struct{}))
- defer g.Close(true)
-}
diff --git a/pkg/util/log.go b/pkg/util/log.go
index 5e475a7..bc82a89 100644
--- a/pkg/util/log.go
+++ b/pkg/util/log.go
@@ -20,6 +20,7 @@ import (
"fmt"
"github.com/ServiceComb/paas-lager"
"github.com/ServiceComb/paas-lager/third_party/forked/cloudfoundry/lager"
+ "golang.org/x/net/context"
"os"
"path/filepath"
"runtime"
@@ -57,7 +58,7 @@ func init() {
loggers = make(map[string]lager.Logger, 10)
loggerNames = make(map[string]string, 10)
// make LOGGER do not be nil, new a stdout logger
- LOGGER = newLogger(fromLagerConfig(defaultLagerConfig))
+ LOGGER = NewLogger(fromLagerConfig(defaultLagerConfig))
}
func fromLagerConfig(c *stlager.Config) LoggerConfig {
@@ -82,7 +83,7 @@ func toLagerConfig(c LoggerConfig) stlager.Config {
}
// newLog new log, unsafe
-func newLogger(cfg LoggerConfig) lager.Logger {
+func NewLogger(cfg LoggerConfig) lager.Logger {
stlager.Init(toLagerConfig(cfg))
return stlager.NewLogger(cfg.LoggerFile)
}
@@ -93,7 +94,7 @@ func InitGlobalLogger(cfg LoggerConfig) {
cfg.LoggerLevel = defaultLagerConfig.LoggerLevel
}
loggerConfig = cfg
- LOGGER = newLogger(cfg)
+ LOGGER = NewLogger(cfg)
// log rotate
RunLogDirRotate(cfg)
// recreate the deleted log file
@@ -144,7 +145,7 @@ func Logger() lager.Logger {
if len(cfg.LoggerFile) != 0 {
cfg.LoggerFile = filepath.Join(filepath.Dir(cfg.LoggerFile), logFile+".log")
}
- logger = newLogger(cfg)
+ logger = NewLogger(cfg)
loggers[logFile] = logger
LOGGER.Warnf(nil, "match %s, new logger %s for %s", prefix, logFile, funcFullName)
}
@@ -190,10 +191,10 @@ func monitorLogFile() {
if len(loggerConfig.LoggerFile) == 0 {
return
}
- Go(func(stopCh <-chan struct{}) {
+ Go(func(ctx context.Context) {
for {
select {
- case <-stopCh:
+ case <-ctx.Done():
return
case <-time.After(time.Minute):
Logger().Debug(fmt.Sprintf("Check log file at %s", time.Now()))
diff --git a/pkg/util/logrotate.go b/pkg/util/logrotate.go
index db4b79f..e7dbcef 100644
--- a/pkg/util/logrotate.go
+++ b/pkg/util/logrotate.go
@@ -19,6 +19,7 @@ package util
import (
"archive/zip"
"fmt"
+ "golang.org/x/net/context"
"io"
"os"
"path/filepath"
@@ -293,10 +294,10 @@ func CopyFile(srcFile, destFile string) error {
}
func RunLogDirRotate(cfg LoggerConfig) {
- Go(func(stopCh <-chan struct{}) {
+ Go(func(ctx context.Context) {
for {
select {
- case <-stopCh:
+ case <-ctx.Done():
return
case <-time.After(cfg.LogRotatePeriod):
LogRotate(filepath.Dir(cfg.LoggerFile), cfg.LogRotateSize, cfg.LogBackupCount)
diff --git a/server/api.go b/server/api.go
index 26ac55a..f434bdc 100644
--- a/server/api.go
+++ b/server/api.go
@@ -36,8 +36,9 @@ func init() {
InitAPI()
apiServer = &APIServer{
- isClose: true,
- err: make(chan error, 1),
+ isClose: true,
+ err: make(chan error, 1),
+ goroutine: util.NewGo(context.Background()),
}
}
@@ -66,6 +67,7 @@ type APIServer struct {
isClose bool
forked bool
err chan error
+ goroutine *util.GoRoutine
}
const (
@@ -176,16 +178,18 @@ func (s *APIServer) doAPIServerHeartBeat(pCtx context.Context) {
}
func (s *APIServer) startHeartBeatService() {
- go func() {
+ s.goroutine.Do(func(ctx context.Context) {
for {
select {
+ case <-ctx.Done():
+ return
case <-s.err:
return
case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
s.doAPIServerHeartBeat(context.Background())
}
}
- }()
+ })
}
func (s *APIServer) graceDone() {
@@ -211,14 +215,14 @@ func (s *APIServer) startRESTServer() (err error) {
}
util.Logger().Infof("Local listen address: %s, host: %s.", ep, s.HostName)
- go func() {
+ s.goroutine.Do(func(_ context.Context) {
err := s.restSrv.Serve()
if s.isClose {
return
}
util.Logger().Errorf(err, "error to start REST API server %s", ep)
s.err <- err
- }()
+ })
return
}
@@ -234,14 +238,14 @@ func (s *APIServer) startRPCServer() (err error) {
}
util.Logger().Infof("Local listen address: %s, host: %s.", ep, s.HostName)
- go func() {
+ s.goroutine.Do(func(_ context.Context) {
err := s.rpcSrv.Serve()
if s.isClose {
return
}
util.Logger().Errorf(err, "error to start RPC API server %s", ep)
s.err <- err
- }()
+ })
return
}
@@ -301,6 +305,8 @@ func (s *APIServer) Stop() {
close(s.err)
+ s.goroutine.Close(true)
+
util.Logger().Info("api server stopped.")
}
diff --git a/server/broker/service.go b/server/broker/service.go
index bde8f3e..3ae63c6 100644
--- a/server/broker/service.go
+++ b/server/broker/service.go
@@ -34,7 +34,7 @@ import (
"golang.org/x/net/context"
)
-var BrokerServiceAPI *BrokerService = &BrokerService{}
+var BrokerServiceAPI = &BrokerService{}
type BrokerService struct {
}
diff --git a/server/broker/store.go b/server/broker/store.go
index 0cdddc5..95c61a4 100644
--- a/server/broker/store.go
+++ b/server/broker/store.go
@@ -21,6 +21,7 @@ import (
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
sstore "github.com/apache/incubator-servicecomb-service-center/server/core/backend/store"
+ "golang.org/x/net/context"
)
const (
@@ -72,12 +73,16 @@ func (s *BKvStore) newStore(t sstore.StoreType, opts ...sstore.KvCacherCfgOption
s.newIndexer(t, sstore.NewKvCacher(opts...))
}
-func (s *BKvStore) store() {
+func (s *BKvStore) store(ctx context.Context) {
for t := sstore.StoreType(0); t != typeEnd; t++ {
s.newStore(t)
}
for _, i := range s.bindexers {
- <-i.Ready()
+ select {
+ case <-ctx.Done():
+ return
+ case <-i.Ready():
+ }
}
util.SafeCloseChan(s.bready)
@@ -120,7 +125,13 @@ func (s *BKvStore) newIndexer(t sstore.StoreType, cacher sstore.Cacher) {
}
func (s *BKvStore) Run() {
- go s.store()
+ util.Go(func(ctx context.Context) {
+ s.store(ctx)
+ select {
+ case <-ctx.Done():
+ s.Stop()
+ }
+ })
}
func (s *BKvStore) Ready() <-chan struct{} {
@@ -154,3 +165,18 @@ func (s *BKvStore) Verification() *sstore.Indexer {
func (s *BKvStore) PactLatest() *sstore.Indexer {
return s.bindexers[PACT_LATEST]
}
+
+func (s *BKvStore) Stop() {
+ if s.bisClose {
+ return
+ }
+ s.bisClose = true
+
+ for _, i := range s.bindexers {
+ i.Stop()
+ }
+
+ util.SafeCloseChan(s.bready)
+
+ util.Logger().Debugf("broker store daemon stopped")
+}
diff --git a/server/broker/util.go b/server/broker/util.go
index 23e98ca..895ee8c 100644
--- a/server/broker/util.go
+++ b/server/broker/util.go
@@ -25,14 +25,16 @@ import (
"strconv"
"strings"
- "github.com/ServiceComb/paas-lager"
"github.com/ServiceComb/paas-lager/third_party/forked/cloudfoundry/lager"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
- backend "github.com/apache/incubator-servicecomb-service-center/server/core/backend"
+ "github.com/apache/incubator-servicecomb-service-center/server/core"
+ "github.com/apache/incubator-servicecomb-service-center/server/core/backend"
pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
scerr "github.com/apache/incubator-servicecomb-service-center/server/error"
"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
serviceUtil "github.com/apache/incubator-servicecomb-service-center/server/service/util"
+ "path/filepath"
+ "time"
)
var PactLogger lager.Logger
@@ -88,12 +90,18 @@ var brokerAPILinksTitles = map[string]string{
func init() {
//define Broker logger
- stlager.Init(stlager.Config{
- LoggerLevel: "INFO",
- LoggerFile: "broker_srvc.log",
- EnableRsyslog: false,
+ name := ""
+ if len(core.ServerInfo.Config.LogFilePath) != 0 {
+ name = filepath.Join(filepath.Dir(core.ServerInfo.Config.LogFilePath), "broker_srvc.log")
+ }
+ PactLogger = util.NewLogger(util.LoggerConfig{
+ LoggerLevel: core.ServerInfo.Config.LogLevel,
+ LoggerFile: name,
+ LogFormatText: core.ServerInfo.Config.LogFormat == "text",
+ LogRotatePeriod: 30 * time.Second,
+ LogRotateSize: int(core.ServerInfo.Config.LogRotateSize),
+ LogBackupCount: int(core.ServerInfo.Config.LogBackupCount),
})
- PactLogger = stlager.NewLogger("broker_srvc")
}
func GetDefaultTenantProject() string {
diff --git a/server/core/0_init.go b/server/core/0_init.go
index 1b88849..8749af6 100644
--- a/server/core/0_init.go
+++ b/server/core/0_init.go
@@ -87,7 +87,6 @@ func initLogger() {
}
func handleSignals() {
- var sig os.Signal
sigCh := make(chan os.Signal)
signal.Notify(sigCh,
syscall.SIGINT,
@@ -95,13 +94,14 @@ func handleSignals() {
syscall.SIGTERM,
)
wait := 5 * time.Second
- for {
- sig = <-sigCh
+ for sig := range sigCh {
switch sig {
case syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM:
<-time.After(wait)
- util.Logger().Warnf(nil, "Waiting for server response timed out(%s), force shutdown.", wait)
+ util.Logger().Warnf(nil, "waiting for server response timed out(%s), force shutdown", wait)
os.Exit(1)
+ default:
+ util.Logger().Warnf(nil, "received signal '%v'", sig)
}
}
}
diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go
index c6ade97..39ba733 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -169,12 +169,12 @@ type KvCacher struct {
lastRev int64
noEventInterval int
- ready chan struct{}
- lw ListWatcher
- mux sync.Mutex
- once sync.Once
- cache *KvCache
- goroute *util.GoRoutine
+ ready chan struct{}
+ lw ListWatcher
+ mux sync.Mutex
+ once sync.Once
+ cache *KvCache
+ goroutine *util.GoRoutine
}
func (c *KvCacher) needList() bool {
@@ -267,23 +267,18 @@ func (c *KvCacher) needDeferHandle(evts []*Event) bool {
return c.Cfg.DeferHandler.OnCondition(c.Cache(), evts)
}
-func (c *KvCacher) refresh(stopCh <-chan struct{}) {
+func (c *KvCacher) refresh(ctx context.Context) {
util.Logger().Debugf("start to list and watch %s", c.Cfg)
- ctx, cancel := context.WithCancel(context.Background())
- c.goroute.Do(func(stopCh <-chan struct{}) {
- defer cancel()
- <-stopCh
- })
for {
start := time.Now()
c.ListAndWatch(ctx)
watchDuration := time.Since(start)
- nextPeriod := 0 * time.Second
+ nextPeriod := c.Cfg.Period
if watchDuration > 0 && c.Cfg.Period > watchDuration {
nextPeriod = c.Cfg.Period - watchDuration
}
select {
- case <-stopCh:
+ case <-ctx.Done():
util.Logger().Debugf("stop to list and watch %s", c.Cfg)
return
case <-time.After(nextPeriod):
@@ -291,7 +286,7 @@ func (c *KvCacher) refresh(stopCh <-chan struct{}) {
}
}
-func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
+func (c *KvCacher) deferHandle(ctx context.Context) {
if c.Cfg.DeferHandler == nil {
return
}
@@ -299,7 +294,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
i, evts := 0, make([]*Event, event_block_size)
for {
select {
- case <-stopCh:
+ case <-ctx.Done():
return
case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
if !ok {
@@ -524,8 +519,8 @@ func (c *KvCacher) onKvEvents(evts []*KvEvent) {
}
func (c *KvCacher) run() {
- c.goroute.Do(c.refresh)
- c.goroute.Do(c.deferHandle)
+ c.goroutine.Do(c.refresh)
+ c.goroutine.Do(c.deferHandle)
}
func (c *KvCacher) Cache() Cache {
@@ -537,7 +532,7 @@ func (c *KvCacher) Run() {
}
func (c *KvCacher) Stop() {
- c.goroute.Close(true)
+ c.goroutine.Close(true)
util.SafeCloseChan(c.ready)
}
@@ -577,7 +572,7 @@ func NewKvCacher(opts ...KvCacherCfgOption) *KvCacher {
Client: backend.Registry(),
Key: cfg.Key,
},
- goroute: util.NewGo(make(chan struct{})),
+ goroutine: util.NewGo(context.Background()),
}
cacher.cache = NewKvCache(cacher, cfg.InitSize)
return cacher
diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go
index d35f973..43464f0 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -21,6 +21,7 @@ import (
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
"github.com/coreos/etcd/mvcc/mvccpb"
+ "golang.org/x/net/context"
"sync"
"time"
)
@@ -98,12 +99,12 @@ func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
return iedh.deferCh
}
-func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) {
+func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
defer util.RecoverAndReport()
t, n := iedh.newTimer(), false
for {
select {
- case <-stopCh:
+ case <-ctx.Done():
return
case evts := <-iedh.pendingCh:
for _, evt := range evts {
@@ -117,7 +118,7 @@ func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) {
}
total := iedh.cache.Size()
- if !iedh.enabled && del > 0 && total > 0 && float64(del) >= float64(total)*iedh.Percent {
+ if !iedh.enabled && del > 0 && total > 5 && float64(del) >= float64(total)*iedh.Percent {
iedh.enabled = true
util.Logger().Warnf(nil, "self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events",
del, total, iedh.Percent*100)
diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go
index 7236050..5cf3e6d 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -186,11 +186,11 @@ func (i *Indexer) OnCacheEvent(evt *KvEvent) {
}
func (i *Indexer) buildIndex() {
- i.goroutine.Do(func(stopCh <-chan struct{}) {
+ i.goroutine.Do(func(ctx context.Context) {
util.SafeCloseChan(i.ready)
for {
select {
- case <-stopCh:
+ case <-ctx.Done():
return
case evt, ok := <-i.prefixBuildQueue:
if !ok {
@@ -317,7 +317,7 @@ func NewCacheIndexer(t StoreType, cr Cacher) *Indexer {
cacheType: t,
prefixIndex: make(map[string]map[string]struct{}, DEFAULT_MAX_EVENT_COUNT),
prefixBuildQueue: make(chan *KvEvent, DEFAULT_MAX_EVENT_COUNT),
- goroutine: util.NewGo(make(chan struct{})),
+ goroutine: util.NewGo(context.Background()),
ready: make(chan struct{}),
isClose: true,
}
diff --git a/server/core/backend/store/listwatch.go b/server/core/backend/store/listwatch.go
index 189f909..4ffb9d4 100644
--- a/server/core/backend/store/listwatch.go
+++ b/server/core/backend/store/listwatch.go
@@ -131,17 +131,17 @@ func (w *Watcher) EventBus() <-chan []*Event {
return w.bus
}
-func (w *Watcher) process() {
+func (w *Watcher) process(_ context.Context) {
stopCh := make(chan struct{})
ctx, cancel := context.WithTimeout(w.ListOps.Context, w.ListOps.Timeout)
- go func() {
+ util.Go(func(_ context.Context) {
defer close(stopCh)
w.lw.doWatch(ctx, w.sendEvent)
- }()
+ })
select {
case <-stopCh:
- // time out
+ // timed out or exception
w.Stop()
case <-w.stopCh:
cancel()
@@ -180,6 +180,6 @@ func newWatcher(lw *ListWatcher, listOps *ListOptions) *Watcher {
bus: make(chan []*Event, EVENT_BUS_MAX_SIZE),
stopCh: make(chan struct{}),
}
- go w.process()
+ util.Go(w.process)
return w
}
diff --git a/server/core/backend/store/store.go b/server/core/backend/store/store.go
index 6a5f38c..c462d41 100644
--- a/server/core/backend/store/store.go
+++ b/server/core/backend/store/store.go
@@ -109,6 +109,7 @@ type KvStore struct {
asyncTaskSvc *async.AsyncTaskService
lock sync.RWMutex
ready chan struct{}
+ goroutine *util.GoRoutine
isClose bool
}
@@ -116,6 +117,7 @@ func (s *KvStore) Initialize() {
s.indexers = make(map[StoreType]*Indexer)
s.asyncTaskSvc = async.NewAsyncTaskService()
s.ready = make(chan struct{})
+ s.goroutine = util.NewGo(context.Background())
for i := StoreType(0); i != typeEnd; i++ {
store.newNullStore(i)
@@ -147,7 +149,7 @@ func (s *KvStore) newIndexer(t StoreType, cacher Cacher) {
}
func (s *KvStore) Run() {
- go s.store()
+ s.goroutine.Do(s.store)
s.asyncTaskSvc.Run()
}
@@ -166,7 +168,7 @@ func (s *KvStore) SelfPreservationHandler() DeferHandler {
return &InstanceEventDeferHandler{Percent: DEFAULT_SELF_PRESERVATION_PERCENT}
}
-func (s *KvStore) store() {
+func (s *KvStore) store(ctx context.Context) {
for t := StoreType(0); t != typeEnd; t++ {
switch t {
case INSTANCE:
@@ -178,7 +180,11 @@ func (s *KvStore) store() {
}
}
for _, i := range s.indexers {
- <-i.Ready()
+ select {
+ case <-ctx.Done():
+ return
+ case <-i.Ready():
+ }
}
util.SafeCloseChan(s.ready)
@@ -214,9 +220,11 @@ func (s *KvStore) Stop() {
s.asyncTaskSvc.Stop()
+ s.goroutine.Close(true)
+
util.SafeCloseChan(s.ready)
- util.Logger().Debugf("store daemon stopped.")
+ util.Logger().Debugf("store daemon stopped")
}
func (s *KvStore) Ready() <-chan struct{} {
diff --git a/server/infra/registry/registry.go b/server/infra/registry/registry.go
index 575a351..0856fa3 100644
--- a/server/infra/registry/registry.go
+++ b/server/infra/registry/registry.go
@@ -144,7 +144,7 @@ const (
)
const (
- REQUEST_TIMEOUT = 300
+ REQUEST_TIMEOUT = 30 * time.Second
DEFAULT_PAGE_COUNT = 4096 // grpc does not allow to transport a large body more then 4MB in a request.
)
@@ -359,7 +359,7 @@ func OpCmp(opt CompareOperation, result CompareResult, v interface{}) (cmp Compa
}
func WithTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
- return context.WithTimeout(ctx, REQUEST_TIMEOUT*time.Second)
+ return context.WithTimeout(ctx, REQUEST_TIMEOUT)
}
func RegistryConfig() *Config {
diff --git a/server/plugin/infra/registry/embededetcd/embededetcd.go b/server/plugin/infra/registry/embededetcd/embededetcd.go
index 4b941bc..3bb7140 100644
--- a/server/plugin/infra/registry/embededetcd/embededetcd.go
+++ b/server/plugin/infra/registry/embededetcd/embededetcd.go
@@ -41,16 +41,17 @@ import (
var embedTLSConfig *tls.Config
-const START_MANAGER_SERVER_TIMEOUT = 60
+const START_MANAGER_SERVER_TIMEOUT = 10
func init() {
mgr.RegisterPlugin(mgr.Plugin{mgr.REGISTRY, "embeded_etcd", getEmbedInstance})
}
type EtcdEmbed struct {
- Server *embed.Etcd
- err chan error
- ready chan int
+ Embed *embed.Etcd
+ err chan error
+ ready chan int
+ goroutine *util.GoRoutine
}
func (s *EtcdEmbed) Err() <-chan error {
@@ -62,9 +63,10 @@ func (s *EtcdEmbed) Ready() <-chan int {
}
func (s *EtcdEmbed) Close() {
- if s.Server != nil {
- s.Server.Close()
+ if s.Embed != nil {
+ s.Embed.Close()
}
+ s.goroutine.Close(true)
util.Logger().Debugf("embedded etcd client stopped.")
}
@@ -232,7 +234,7 @@ func (s *EtcdEmbed) Compact(ctx context.Context, reserve int64) error {
}
util.Logger().Infof("Compacting... revision is %d(current: %d, reserve %d)", revToCompact, curRev, reserve)
- _, err := s.Server.Server.Compact(ctx, &etcdserverpb.CompactionRequest{
+ _, err := s.Embed.Server.Compact(ctx, &etcdserverpb.CompactionRequest{
Revision: revToCompact,
Physical: true,
})
@@ -250,7 +252,7 @@ func (s *EtcdEmbed) Compact(ctx context.Context, reserve int64) error {
}
func (s *EtcdEmbed) getLeaderCurrentRevision(ctx context.Context) int64 {
- return s.Server.Server.KV().Rev()
+ return s.Embed.Server.KV().Rev()
}
func (s *EtcdEmbed) PutNoOverride(ctx context.Context, opts ...registry.PluginOpOption) (bool, error) {
@@ -275,7 +277,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts ...registry.PluginOpOption) (*r
switch op.Action {
case registry.Get:
var etcdResp *etcdserverpb.RangeResponse
- etcdResp, err = s.Server.Server.Range(otCtx, s.toGetRequest(op))
+ etcdResp, err = s.Embed.Server.Range(otCtx, s.toGetRequest(op))
if err != nil {
break
}
@@ -286,7 +288,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts ...registry.PluginOpOption) (*r
}
case registry.Put:
var etcdResp *etcdserverpb.PutResponse
- etcdResp, err = s.Server.Server.Put(otCtx, s.toPutRequest(op))
+ etcdResp, err = s.Embed.Server.Put(otCtx, s.toPutRequest(op))
if err != nil {
break
}
@@ -295,7 +297,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts ...registry.PluginOpOption) (*r
}
case registry.Delete:
var etcdResp *etcdserverpb.DeleteRangeResponse
- etcdResp, err = s.Server.Server.DeleteRange(otCtx, s.toDeleteRequest(op))
+ etcdResp, err = s.Embed.Server.DeleteRange(otCtx, s.toDeleteRequest(op))
if err != nil {
break
}
@@ -338,7 +340,7 @@ func (s *EtcdEmbed) TxnWithCmp(ctx context.Context, success []registry.PluginOp,
if len(etcdFailOps) > 0 {
txnRequest.Failure = etcdFailOps
}
- resp, err := s.Server.Server.Txn(otCtx, txnRequest)
+ resp, err := s.Embed.Server.Txn(otCtx, txnRequest)
if err != nil {
return nil, err
}
@@ -351,7 +353,7 @@ func (s *EtcdEmbed) TxnWithCmp(ctx context.Context, success []registry.PluginOp,
func (s *EtcdEmbed) LeaseGrant(ctx context.Context, TTL int64) (int64, error) {
otCtx, cancel := registry.WithTimeout(ctx)
defer cancel()
- etcdResp, err := s.Server.Server.LeaseGrant(otCtx, &etcdserverpb.LeaseGrantRequest{
+ etcdResp, err := s.Embed.Server.LeaseGrant(otCtx, &etcdserverpb.LeaseGrantRequest{
TTL: TTL,
})
if err != nil {
@@ -363,7 +365,7 @@ func (s *EtcdEmbed) LeaseGrant(ctx context.Context, TTL int64) (int64, error) {
func (s *EtcdEmbed) LeaseRenew(ctx context.Context, leaseID int64) (int64, error) {
otCtx, cancel := registry.WithTimeout(ctx)
defer cancel()
- ttl, err := s.Server.Server.LeaseRenew(otCtx, lease.LeaseID(leaseID))
+ ttl, err := s.Embed.Server.LeaseRenew(otCtx, lease.LeaseID(leaseID))
if err != nil {
if err.Error() == grpc.ErrorDesc(rpctypes.ErrGRPCLeaseNotFound) {
return 0, err
@@ -376,7 +378,7 @@ func (s *EtcdEmbed) LeaseRenew(ctx context.Context, leaseID int64) (int64, error
func (s *EtcdEmbed) LeaseRevoke(ctx context.Context, leaseID int64) error {
otCtx, cancel := registry.WithTimeout(ctx)
defer cancel()
- _, err := s.Server.Server.LeaseRevoke(otCtx, &etcdserverpb.LeaseRevokeRequest{
+ _, err := s.Embed.Server.LeaseRevoke(otCtx, &etcdserverpb.LeaseRevokeRequest{
ID: leaseID,
})
if err != nil {
@@ -392,7 +394,7 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts ...registry.PluginOpOption)
op := registry.OpGet(opts...)
if len(op.Key) > 0 {
- watchable := s.Server.Server.Watchable()
+ watchable := s.Embed.Server.Watchable()
ws := watchable.NewWatchStream()
defer ws.Close()
@@ -455,6 +457,29 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts ...registry.PluginOpOption)
return
}
+func (s *EtcdEmbed) ReadyNotify() {
+ timeout := START_MANAGER_SERVER_TIMEOUT * time.Second
+ select {
+ case <-s.Embed.Server.ReadyNotify():
+ close(s.ready)
+ s.goroutine.Do(func(ctx context.Context) {
+ select {
+ case <-ctx.Done():
+ return
+ case err := <-s.Embed.Err():
+ s.err <- err
+ }
+ })
+ case <-time.After(timeout):
+ err := fmt.Errorf("timed out(%s)", timeout)
+ util.Logger().Errorf(err, "read notify failed")
+
+ s.Embed.Server.Stop()
+
+ s.err <- err
+ }
+}
+
func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *mvccpb.Event) registry.ActionType {
switch evt.Type {
case mvccpb.DELETE:
@@ -488,8 +513,9 @@ func getEmbedInstance() mgr.PluginInstance {
addrs := beego.AppConfig.DefaultString("manager_addr", "http://127.0.0.1:2380")
inst := &EtcdEmbed{
- err: make(chan error, 1),
- ready: make(chan int),
+ err: make(chan error, 1),
+ ready: make(chan int),
+ goroutine: util.NewGo(context.Background()),
}
if core.ServerInfo.Config.SslEnabled {
@@ -537,30 +563,14 @@ func getEmbedInstance() mgr.PluginInstance {
inst.err <- err
return inst
}
- inst.Server = etcd
-
- select {
- case <-etcd.Server.ReadyNotify():
- close(inst.ready)
- go func() {
- select {
- case err := <-etcd.Err():
- inst.err <- err
- }
- }()
- case <-time.After(START_MANAGER_SERVER_TIMEOUT * time.Second):
- message := "etcd server took too long to start"
- util.Logger().Error(message, nil)
+ inst.Embed = etcd
- etcd.Server.Stop()
-
- inst.err <- errors.New(message)
- }
+ inst.ReadyNotify()
return inst
}
func parseURL(addrs string) ([]url.URL, error) {
- urls := []url.URL{}
+ var urls []url.URL
ips := strings.Split(addrs, ",")
for _, ip := range ips {
addr, err := url.Parse(ip)
diff --git a/server/plugin/infra/tracing/buildin/file_collector.go b/server/plugin/infra/tracing/buildin/file_collector.go
index bd48e5b..851b3fc 100644
--- a/server/plugin/infra/tracing/buildin/file_collector.go
+++ b/server/plugin/infra/tracing/buildin/file_collector.go
@@ -23,15 +23,19 @@ import (
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
"github.com/apache/incubator-servicecomb-service-center/server/core"
"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+ "golang.org/x/net/context"
"os"
+ "strings"
"time"
)
type FileCollector struct {
Fd *os.File
+ Timeout time.Duration
Interval time.Duration
BatchSize int
c chan *zipkincore.Span
+ goroutine *util.GoRoutine
}
func (f *FileCollector) Collect(span *zipkincore.Span) error {
@@ -39,11 +43,16 @@ func (f *FileCollector) Collect(span *zipkincore.Span) error {
return fmt.Errorf("required FD to write")
}
- f.c <- span
+ select {
+ case f.c <- span:
+ case <-time.After(f.Timeout):
+ util.Logger().Errorf(nil, "send span to handle channel timed out(%s)", f.Timeout)
+ }
return nil
}
func (f *FileCollector) Close() error {
+ f.goroutine.Close(true)
return f.Fd.Close()
}
@@ -77,7 +86,7 @@ func (f *FileCollector) write(batch []*zipkincore.Span) (c int) {
}
func (f *FileCollector) checkFile() error {
- if util.PathExist(f.Fd.Name()) {
+ if util.PathExist(f.Fd.Name()) || strings.Index(f.Fd.Name(), "/dev/") == 0 {
return nil
}
@@ -100,52 +109,54 @@ func (f *FileCollector) checkFile() error {
return nil
}
-func (f *FileCollector) Run(stopCh <-chan struct{}) {
- var (
- batch []*zipkincore.Span
- prev []*zipkincore.Span
- i = f.Interval * 10
- t = time.NewTicker(f.Interval)
- nr = time.Now().Add(i)
- max = f.BatchSize * 2
- )
- for {
- select {
- case <-stopCh:
- f.write(batch)
- return
- case span := <-f.c:
- batch = append(batch, span)
- if len(batch) >= f.BatchSize {
- if len(batch) > max {
- dispose := len(batch) - f.BatchSize
- util.Logger().Errorf(nil, "backlog is full, dispose %d span(s), max: %d",
- dispose, max)
- batch = batch[dispose:] // allocate more
- }
- if c := f.write(batch); c == 0 {
- continue
+func (f *FileCollector) Run() {
+ f.goroutine.Do(func(ctx context.Context) {
+ var (
+ batch []*zipkincore.Span
+ prev []*zipkincore.Span
+ i = f.Interval * 10
+ t = time.NewTicker(f.Interval)
+ nr = time.Now().Add(i)
+ max = f.BatchSize * 2
+ )
+ for {
+ select {
+ case <-ctx.Done():
+ f.write(batch)
+ return
+ case span := <-f.c:
+ batch = append(batch, span)
+ if len(batch) >= f.BatchSize {
+ if len(batch) > max {
+ dispose := len(batch) - f.BatchSize
+ util.Logger().Errorf(nil, "backlog is full, dispose %d span(s), max: %d",
+ dispose, max)
+ batch = batch[dispose:] // allocate more
+ }
+ if c := f.write(batch); c == 0 {
+ continue
+ }
+ if prev != nil {
+ batch, prev = prev[:0], batch
+ } else {
+ prev, batch = batch, batch[len(batch):] // new one
+ }
}
- if prev != nil {
- batch, prev = prev[:0], batch
- } else {
- prev, batch = batch, batch[len(batch):] // new one
+ case <-t.C:
+ if time.Now().After(nr) {
+ util.LogRotateFile(f.Fd.Name(),
+ int(core.ServerInfo.Config.LogRotateSize),
+ int(core.ServerInfo.Config.LogBackupCount),
+ )
+ nr = time.Now().Add(i)
}
- }
- case <-t.C:
- if time.Now().After(nr) {
- util.LogRotateFile(f.Fd.Name(),
- int(core.ServerInfo.Config.LogRotateSize),
- int(core.ServerInfo.Config.LogBackupCount),
- )
- nr = time.Now().Add(i)
- }
- if c := f.write(batch); c > 0 {
- batch = batch[:0]
+ if c := f.write(batch); c > 0 {
+ batch = batch[:0]
+ }
}
}
- }
+ })
}
func NewFileCollector(path string) (*FileCollector, error) {
@@ -155,10 +166,12 @@ func NewFileCollector(path string) (*FileCollector, error) {
}
fc := &FileCollector{
Fd: fd,
+ Timeout: 5 * time.Second,
Interval: 10 * time.Second,
BatchSize: 100,
c: make(chan *zipkincore.Span, 1000),
+ goroutine: util.NewGo(context.Background()),
}
- util.Go(fc.Run)
+ fc.Run()
return fc, nil
}
diff --git a/server/plugin/infra/tracing/buildin/file_collector_test.go b/server/plugin/infra/tracing/buildin/file_collector_test.go
index 075fdae..4f3e150 100644
--- a/server/plugin/infra/tracing/buildin/file_collector_test.go
+++ b/server/plugin/infra/tracing/buildin/file_collector_test.go
@@ -17,34 +17,33 @@
package buildin
import (
+ "fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+ "golang.org/x/net/context"
"os"
"testing"
"time"
)
func TestFileCollector_Collect(t *testing.T) {
- fileName := "./test"
- fd, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
- if err != nil {
- t.FailNow()
- }
fc := &FileCollector{
- Fd: fd,
+ Fd: os.Stdout,
+ Timeout: 1 * time.Second,
Interval: 100 * time.Second,
BatchSize: 2,
- c: make(chan *zipkincore.Span, 1000),
+ c: make(chan *zipkincore.Span, 100),
+ goroutine: util.NewGo(context.Background()),
}
defer func() {
fc.Close()
- os.Remove(fileName)
}()
- util.Go(fc.Run)
+ fc.Run()
- for i := int64(0); i < 3; i++ {
- err := fc.Collect(&zipkincore.Span{ParentID: &i, TraceIDHigh: &i})
+ for i := 0; i < 10; i++ {
+ err := fc.Collect(&zipkincore.Span{})
if err != nil {
+ fmt.Println(err)
t.FailNow()
}
}
diff --git a/server/plugin/infra/tracing/buildin/span.go b/server/plugin/infra/tracing/buildin/span.go
index 5b8011d..3d20088 100644
--- a/server/plugin/infra/tracing/buildin/span.go
+++ b/server/plugin/infra/tracing/buildin/span.go
@@ -61,7 +61,9 @@ type Endpoint struct {
func (s *Span) FromZipkinSpan(span *zipkincore.Span) {
traceId := new(types.TraceID)
traceId.Low = uint64(span.TraceID)
- traceId.High = uint64(*(span.TraceIDHigh))
+ if span.TraceIDHigh != nil {
+ traceId.High = uint64(*(span.TraceIDHigh))
+ }
s.TraceID = traceId.ToHex()
s.Duration = span.Duration
diff --git a/server/plugin/infra/tracing/buildin/span_test.go b/server/plugin/infra/tracing/buildin/span_test.go
index c3dceb4..069e6a9 100644
--- a/server/plugin/infra/tracing/buildin/span_test.go
+++ b/server/plugin/infra/tracing/buildin/span_test.go
@@ -158,4 +158,12 @@ func TestFromZipkinSpan(t *testing.T) {
t.FailNow()
}
fmt.Println(string(b))
+
+ s = FromZipkinSpan(&zipkincore.Span{})
+ b, err = json.Marshal(s)
+ if err != nil {
+ fmt.Println("TestFromZipkinSpan Marshal", err)
+ t.FailNow()
+ }
+ fmt.Println(string(b))
}
diff --git a/server/server.go b/server/server.go
index a546879..15e0abd 100644
--- a/server/server.go
+++ b/server/server.go
@@ -43,6 +43,7 @@ func init() {
store: st.Store(),
notifyService: nf.GetNotifyService(),
apiServer: GetAPIServer(),
+ goroutine: util.NewGo(context.Background()),
}
}
@@ -50,6 +51,7 @@ type ServiceCenterServer struct {
apiServer *APIServer
notifyService *nf.NotifyService
store *st.KvStore
+ goroutine *util.GoRoutine
}
func (s *ServiceCenterServer) Run() {
@@ -74,7 +76,7 @@ func (s *ServiceCenterServer) waitForQuit() {
s.Stop()
- util.Logger().Warn("service center quit", nil)
+ util.Logger().Debugf("service center stopped")
}
func (s *ServiceCenterServer) needUpgrade() bool {
@@ -119,12 +121,12 @@ func (s *ServiceCenterServer) autoCompactBackend() {
util.Logger().Errorf(err, "invalid compact interval %s, reset to default interval 12h", core.ServerInfo.Config.CompactInterval)
interval = 12 * time.Hour
}
- util.Go(func(stopCh <-chan struct{}) {
+ s.goroutine.Do(func(ctx context.Context) {
util.Logger().Infof("enabled the automatic compact mechanism, compact once every %s, reserve %d",
core.ServerInfo.Config.CompactInterval, delta)
for {
select {
- case <-stopCh:
+ case <-ctx.Done():
return
case <-time.After(interval):
lock, err := mux.Try(mux.GLOBAL_LOCK)
@@ -133,7 +135,7 @@ func (s *ServiceCenterServer) autoCompactBackend() {
continue
}
- backend.Registry().Compact(context.Background(), delta)
+ backend.Registry().Compact(ctx, delta)
lock.Unlock()
}
@@ -190,9 +192,7 @@ func (s *ServiceCenterServer) Stop() {
s.store.Stop()
}
- util.GoCloseAndWait()
-
- backend.Registry().Close()
+ s.goroutine.Close(true)
}
func Run() {
diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go
index 7bbe5df..6961c81 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -50,7 +50,7 @@ func (h *DependencyEventHandler) OnEvent(evt *store.KvEvent) {
}
func (h *DependencyEventHandler) loop() {
- util.Go(func(stopCh <-chan struct{}) {
+ util.Go(func(ctx context.Context) {
waitDelayIndex := 0
waitDelay := []int{1, 1, 5, 10, 20, 30, 60}
retry := func() {
@@ -64,7 +64,7 @@ func (h *DependencyEventHandler) loop() {
}
for {
select {
- case <-stopCh:
+ case <-ctx.Done():
return
case <-h.signals.Chan():
lock, err := mux.Try(mux.DEP_QUEUE_LOCK)
diff --git a/server/service/instances.go b/server/service/instances.go
index 48c7029..eb94506 100644
--- a/server/service/instances.go
+++ b/server/service/instances.go
@@ -370,19 +370,7 @@ func (s *InstanceService) HeartbeatSet(ctx context.Context, in *pb.HeartbeatSetR
existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId] = true
noMultiCounter++
}
- go func(element *pb.HeartbeatSetElement) {
- hbRst := &pb.InstanceHbRst{
- ServiceId: element.ServiceId,
- InstanceId: element.InstanceId,
- ErrMessage: "",
- }
- _, _, err, _ := serviceUtil.HeartbeatUtil(ctx, domainProject, element.ServiceId, element.InstanceId)
- if err != nil {
- hbRst.ErrMessage = err.Error()
- util.Logger().Errorf(err, "heartbeat set failed, %s/%s", element.ServiceId, element.InstanceId)
- }
- instancesHbRst <- hbRst
- }(heartbeatElement)
+ util.Go(getHeartbeatFunc(ctx, domainProject, instancesHbRst, heartbeatElement))
}
count := 0
successFlag := false
@@ -415,6 +403,22 @@ func (s *InstanceService) HeartbeatSet(ctx context.Context, in *pb.HeartbeatSetR
}
}
+func getHeartbeatFunc(ctx context.Context, domainProject string, instancesHbRst chan<- *pb.InstanceHbRst, element *pb.HeartbeatSetElement) func(context.Context) {
+ return func(_ context.Context) {
+ hbRst := &pb.InstanceHbRst{
+ ServiceId: element.ServiceId,
+ InstanceId: element.InstanceId,
+ ErrMessage: "",
+ }
+ _, _, err, _ := serviceUtil.HeartbeatUtil(ctx, domainProject, element.ServiceId, element.InstanceId)
+ if err != nil {
+ hbRst.ErrMessage = err.Error()
+ util.Logger().Errorf(err, "heartbeat set failed, %s/%s", element.ServiceId, element.InstanceId)
+ }
+ instancesHbRst <- hbRst
+ }
+}
+
func (s *InstanceService) GetOneInstance(ctx context.Context, in *pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) {
checkErr := s.getInstancePreCheck(ctx, in)
if checkErr != nil {
@@ -723,7 +727,6 @@ func (s *InstanceService) UpdateInstanceProperties(ctx context.Context, in *pb.U
}, nil
}
-
func (s *InstanceService) WatchPreOpera(ctx context.Context, in *pb.WatchInstanceRequest) error {
if in == nil || len(in.SelfServiceId) == 0 {
return errors.New("Request format invalid.")
@@ -742,7 +745,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream pb.ServiceIn
return err
}
domainProject := util.ParseDomainProject(stream.Context())
- watcher := nf.NewInstanceWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/")
+ watcher := nf.NewInstanceListWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/", nil)
err = nf.GetNotifyService().AddSubscriber(watcher)
util.Logger().Infof("start watch instance status, watcher %s %s", watcher.Subject(), watcher.Id())
return nf.HandleWatchJob(watcher, stream, nf.GetNotifyService().Config.NotifyTimeout)
@@ -754,7 +757,7 @@ func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstan
nf.EstablishWebSocketError(conn, err)
return
}
- nf.DoWebSocketWatch(ctx, in.SelfServiceId, conn)
+ nf.DoWebSocketListAndWatch(ctx, in.SelfServiceId, nil, conn)
}
func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
diff --git a/server/service/microservices.go b/server/service/microservices.go
index 4fe47d2..6663ab9 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -359,11 +359,6 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De
nuoMultilCount++
}
- serviceRst := &pb.DelServicesRspInfo{
- ServiceId: serviceId,
- ErrMessage: "",
- }
-
//检查服务ID合法性
in := &pb.DeleteServiceRequest{
ServiceId: serviceId,
@@ -372,22 +367,15 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De
err := apt.Validate(in)
if err != nil {
util.Logger().Errorf(err, "delete micro-service failed, serviceId is %s: invalid parameters.", in.ServiceId)
- serviceRst.ErrMessage = err.Error()
- serviceRespChan <- serviceRst
+ serviceRespChan <- &pb.DelServicesRspInfo{
+ ServiceId: serviceId,
+ ErrMessage: err.Error(),
+ }
continue
}
//执行删除服务操作
- go func(serviceItem string) {
- resp, err := s.DeleteServicePri(ctx, serviceItem, request.Force)
- if err != nil {
- serviceRst.ErrMessage = err.Error()
- } else if resp.Code != pb.Response_SUCCESS {
- serviceRst.ErrMessage = resp.Message
- }
-
- serviceRespChan <- serviceRst
- }(serviceId)
+ util.Go(s.getDeleteServiceFunc(ctx, serviceId, request.Force, serviceRespChan))
}
//获取批量删除服务的结果
@@ -419,6 +407,23 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De
return resp, nil
}
+func (s *MicroServiceService) getDeleteServiceFunc(ctx context.Context, serviceId string, force bool, serviceRespChan chan<- *pb.DelServicesRspInfo) func(context.Context) {
+ return func(_ context.Context) {
+ serviceRst := &pb.DelServicesRspInfo{
+ ServiceId: serviceId,
+ ErrMessage: "",
+ }
+ resp, err := s.DeleteServicePri(ctx, serviceId, force)
+ if err != nil {
+ serviceRst.ErrMessage = err.Error()
+ } else if resp.Code != pb.Response_SUCCESS {
+ serviceRst.ErrMessage = resp.Message
+ }
+
+ serviceRespChan <- serviceRst
+ }
+}
+
func (s *MicroServiceService) GetOne(ctx context.Context, in *pb.GetServiceRequest) (*pb.GetServiceResponse, error) {
if in == nil || len(in.ServiceId) == 0 {
return &pb.GetServiceResponse{
@@ -437,7 +442,7 @@ func (s *MicroServiceService) GetOne(ctx context.Context, in *pb.GetServiceReque
service, err := serviceUtil.GetService(ctx, domainProject, in.ServiceId)
if err != nil {
- util.Logger().Errorf(err, "get micro-service failed, serviceId is %s: inner err,get service failed.", in.ServiceId)
+ util.Logger().Errorf(err, "get micro-service failed, serviceId is %s: inner err, get service failed.", in.ServiceId)
return &pb.GetServiceResponse{
Response: pb.CreateResponse(scerr.ErrInternal, "Get service file failed."),
}, err
@@ -655,7 +660,7 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create
//create rules
if in.Rules != nil && len(in.Rules) != 0 {
chanLen++
- go func() {
+ util.Go(func(_ context.Context) {
req := &pb.AddServiceRulesRequest{
ServiceId: serviceId,
Rules: in.Rules,
@@ -670,12 +675,12 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create
chanRsp.Message = rsp.Response.Message
}
createRespChan <- chanRsp
- }()
+ })
}
//create tags
if in.Tags != nil && len(in.Tags) != 0 {
chanLen++
- go func() {
+ util.Go(func(_ context.Context) {
req := &pb.AddServiceTagsRequest{
ServiceId: serviceId,
Tags: in.Tags,
@@ -690,12 +695,12 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create
chanRsp.Message = rsp.Response.Message
}
createRespChan <- chanRsp
- }()
+ })
}
// create instance
if in.Instances != nil && len(in.Instances) != 0 {
chanLen++
- go func() {
+ util.Go(func(_ context.Context) {
chanRsp := &pb.Response{}
for _, ins := range in.Instances {
req := &pb.RegisterInstanceRequest{
@@ -711,7 +716,7 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create
}
createRespChan <- chanRsp
}
- }()
+ })
}
// handle result
diff --git a/server/service/notification/listwatcher.go b/server/service/notification/listwatcher.go
index 8a34076..de0e2c6 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/service/notification/listwatcher.go
@@ -19,6 +19,7 @@ package notification
import (
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+ "golang.org/x/net/context"
"time"
)
@@ -44,10 +45,10 @@ func (w *ListWatcher) OnAccept() {
}
util.Logger().Debugf("accepted by notify service, %s watcher %s %s", w.Type(), w.Id(), w.Subject())
- go w.listAndPublishJobs()
+ util.Go(w.listAndPublishJobs)
}
-func (w *ListWatcher) listAndPublishJobs() {
+func (w *ListWatcher) listAndPublishJobs(_ context.Context) {
defer close(w.listCh)
if w.ListFunc == nil {
return
@@ -112,10 +113,6 @@ func NewWatchJob(nType NotifyType, subscriberId, subject string, rev int64, resp
}
}
-func NewWatcher(nType NotifyType, id string, subject string) *ListWatcher {
- return NewListWatcher(nType, id, subject, nil)
-}
-
func NewListWatcher(nType NotifyType, id string, subject string,
listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
watcher := &ListWatcher{
diff --git a/server/service/notification/notification_service.go b/server/service/notification/notification_service.go
index e2d7864..63f78a2 100644
--- a/server/service/notification/notification_service.go
+++ b/server/service/notification/notification_service.go
@@ -20,6 +20,7 @@ import (
"container/list"
"errors"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+ "golang.org/x/net/context"
"sync"
"time"
)
@@ -33,7 +34,8 @@ var notifyService *NotifyService
func init() {
notifyService = &NotifyService{
- isClose: true,
+ isClose: true,
+ goroutine: util.NewGo(context.Background()),
}
}
@@ -46,13 +48,14 @@ type serviceIndex map[NotifyType]subscriberSubjectIndex
type NotifyService struct {
Config NotifyServiceConfig
- services serviceIndex
- queues map[NotifyType]chan NotifyJob
- waits sync.WaitGroup
- mutexes map[NotifyType]*sync.Mutex
- err chan error
- closeMux sync.RWMutex
- isClose bool
+ services serviceIndex
+ queues map[NotifyType]chan NotifyJob
+ waits sync.WaitGroup
+ mutexes map[NotifyType]*sync.Mutex
+ err chan error
+ closeMux sync.RWMutex
+ isClose bool
+ goroutine *util.GoRoutine
}
func (s *NotifyService) Err() <-chan error {
@@ -150,41 +153,52 @@ func (s *NotifyService) AddJob(job NotifyJob) error {
}
}
-func (s *NotifyService) publish2Subscriber(t NotifyType) {
- defer s.waits.Done()
- for job := range s.queues[t] {
- util.Logger().Infof("notification service got a job %s: %s to notify subscriber %s",
- job.Type(), job.Subject(), job.SubscriberId())
+func (s *NotifyService) getPublish2SubscriberFunc(t NotifyType) func(context.Context) {
+ return func(ctx context.Context) {
+ defer s.waits.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case job, ok := <-s.queues[t]:
+ if !ok {
+ return
+ }
- s.mutexes[t].Lock()
+ util.Logger().Infof("notification service got a job %s: %s to notify subscriber %s",
+ job.Type(), job.Subject(), job.SubscriberId())
- if s.Closed() && len(s.services[t]) == 0 {
- s.mutexes[t].Unlock()
- return
- }
+ s.mutexes[t].Lock()
- m, ok := s.services[t][job.Subject()]
- if ok {
- // publish的subject如果带上id,则单播,否则广播
- if len(job.SubscriberId()) != 0 {
- ns, ok := m[job.SubscriberId()]
+ if s.Closed() && len(s.services[t]) == 0 {
+ s.mutexes[t].Unlock()
+ return
+ }
+
+ m, ok := s.services[t][job.Subject()]
if ok {
- for n := ns.Front(); n != nil; n = n.Next() {
- n.Value.(Subscriber).OnMessage(job)
+ // publish的subject如果带上id,则单播,否则广播
+ if len(job.SubscriberId()) != 0 {
+ ns, ok := m[job.SubscriberId()]
+ if ok {
+ for n := ns.Front(); n != nil; n = n.Next() {
+ n.Value.(Subscriber).OnMessage(job)
+ }
+ }
+ s.mutexes[t].Unlock()
+ continue
+ }
+ for key := range m {
+ ns := m[key]
+ for n := ns.Front(); n != nil; n = n.Next() {
+ n.Value.(Subscriber).OnMessage(job)
+ }
}
}
+
s.mutexes[t].Unlock()
- continue
- }
- for key := range m {
- ns := m[key]
- for n := ns.Front(); n != nil; n = n.Next() {
- n.Value.(Subscriber).OnMessage(job)
- }
}
}
-
- s.mutexes[t].Unlock()
}
}
@@ -227,7 +241,7 @@ func (s *NotifyService) Start() {
util.Logger().Debugf("notify service is started with config %s", s.Config)
for i := NotifyType(0); i != typeEnd; i++ {
- go s.publish2Subscriber(i)
+ s.goroutine.Do(s.getPublish2SubscriberFunc(i))
}
}
@@ -255,6 +269,8 @@ func (s *NotifyService) Stop() {
close(s.err)
+ s.goroutine.Close(true)
+
util.Logger().Debug("notify service stopped.")
}
diff --git a/server/service/notification/watch_util.go b/server/service/notification/watch_util.go
index c938719..9a7e26c 100644
--- a/server/service/notification/watch_util.go
+++ b/server/service/notification/watch_util.go
@@ -62,6 +62,7 @@ type WebSocketHandler struct {
watcher *ListWatcher
needPingWatcher bool
closed chan struct{}
+ goroutine *util.GoRoutine
}
func (wh *WebSocketHandler) Init() error {
@@ -101,7 +102,7 @@ func (wh *WebSocketHandler) websocketHeartbeat(messageType int) error {
return nil
}
-func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage() {
+func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage(ctx context.Context) {
defer close(wh.closed)
remoteAddr := wh.conn.RemoteAddr().String()
@@ -128,17 +129,23 @@ func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage() {
})
for {
- _, _, err := wh.conn.ReadMessage()
- if err != nil {
- wh.watcher.SetError(err)
+ select {
+ case <-ctx.Done():
return
+ default:
+ _, _, err := wh.conn.ReadMessage()
+ if err != nil {
+ wh.watcher.SetError(err)
+ return
+ }
}
}
}
func (wh *WebSocketHandler) HandleWatchWebSocketJob() {
- remoteAddr := wh.conn.RemoteAddr().String()
+ wh.goroutine.Do(wh.HandleWatchWebSocketControlMessage)
+ remoteAddr := wh.conn.RemoteAddr().String()
for {
select {
case <-wh.closed:
@@ -224,8 +231,10 @@ func (wh *WebSocketHandler) HandleWatchWebSocketJob() {
}
func (wh *WebSocketHandler) Close(code int, text string) error {
+ defer wh.goroutine.Close(true)
+
remoteAddr := wh.conn.RemoteAddr().String()
- message := []byte{}
+ var message []byte
if code != websocket.CloseNoStatusReceived {
message = websocket.FormatCloseMessage(code, text)
}
@@ -238,18 +247,6 @@ func (wh *WebSocketHandler) Close(code int, text string) error {
return nil
}
-func DoWebSocketWatch(ctx context.Context, serviceId string, conn *websocket.Conn) {
- domainProject := util.ParseDomainProject(ctx)
- handler := &WebSocketHandler{
- ctx: ctx,
- conn: conn,
- watcher: NewInstanceWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/"),
- needPingWatcher: true,
- closed: make(chan struct{}),
- }
- processHandler(handler)
-}
-
func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
domainProject := util.ParseDomainProject(ctx)
handler := &WebSocketHandler{
@@ -258,6 +255,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([]
watcher: NewInstanceListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f),
needPingWatcher: true,
closed: make(chan struct{}),
+ goroutine: util.NewGo(context.Background()),
}
processHandler(handler)
}
@@ -266,7 +264,6 @@ func processHandler(handler *WebSocketHandler) {
if err := handler.Init(); err != nil {
return
}
- go handler.HandleWatchWebSocketControlMessage()
handler.HandleWatchWebSocketJob()
}
@@ -294,10 +291,6 @@ func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey
}
}
-func NewInstanceWatcher(selfServiceId, instanceRoot string) *ListWatcher {
- return NewWatcher(INSTANCE, selfServiceId, instanceRoot)
-}
-
func NewInstanceListWatcher(selfServiceId, instanceRoot string, listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
return NewListWatcher(INSTANCE, selfServiceId, instanceRoot, listFunc)
}
diff --git a/server/service/util/dependency.go b/server/service/util/dependency.go
index 3c91256..fb5741b 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -557,11 +557,10 @@ type Dependency struct {
func (dep *Dependency) RemoveConsumerOfProviderRule() {
dep.chanNum++
- go dep.removeConsumerOfProviderRule()
+ util.Go(dep.removeConsumerOfProviderRule)
}
-func (dep *Dependency) removeConsumerOfProviderRule() {
- ctx := context.TODO()
+func (dep *Dependency) removeConsumerOfProviderRule(ctx context.Context) {
opts := make([]registry.PluginOp, 0, len(dep.removedDependencyRuleList))
for _, providerRule := range dep.removedDependencyRuleList {
proProkey := apt.GenerateProviderDependencyRuleKey(providerRule.Tenant, providerRule)
@@ -605,11 +604,10 @@ func (dep *Dependency) removeConsumerOfProviderRule() {
func (dep *Dependency) AddConsumerOfProviderRule() {
dep.chanNum++
- go dep.addConsumerOfProviderRule()
+ util.Go(dep.addConsumerOfProviderRule)
}
-func (dep *Dependency) addConsumerOfProviderRule() {
- ctx := context.TODO()
+func (dep *Dependency) addConsumerOfProviderRule(ctx context.Context) {
opts := []registry.PluginOp{}
for _, providerRule := range dep.NewDependencyRuleList {
proProkey := apt.GenerateProviderDependencyRuleKey(providerRule.Tenant, providerRule)
--
To stop receiving notification emails like this one, please contact
littlecui@apache.org.