You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/04/13 03:59:58 UTC

[incubator-servicecomb-service-center] branch master updated: SCB-472 Null point reference in zipkin plugin (#325)

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c4143c  SCB-472 Null point reference in zipkin plugin (#325)
6c4143c is described below

commit 6c4143ca52917cbdf3933b59864b91aedbb14e2d
Author: little-cui <su...@qq.com>
AuthorDate: Fri Apr 13 11:59:56 2018 +0800

    SCB-472 Null point reference in zipkin plugin (#325)
    
    * SCB-472 Null point reference in zipkin plugin
    
    * SCB-472 add timeout
    
    * SCB-472 remove type convert
    
    * SCB-472 catch goroutine panic
    
    * SCB-472 catch goroutine panic
    
    * SCB-472 Fix the UT failures
    
    * SCB-472 Fix the UT failures
---
 etc/conf/app.conf                                  |  12 +--
 main.go                                            |  12 ++-
 pkg/async/async_task.go                            |  29 ++++--
 pkg/chain/callback.go                              |   6 +-
 pkg/etcdsync/mutex.go                              |   8 +-
 pkg/grace/grace.go                                 |  29 +++---
 pkg/util/goroutines.go                             |  48 +++++-----
 pkg/util/goroutines_test.go                        |  95 +++++++++----------
 pkg/util/log.go                                    |  13 +--
 pkg/util/logrotate.go                              |   5 +-
 server/api.go                                      |  22 +++--
 server/broker/service.go                           |   2 +-
 server/broker/store.go                             |  32 ++++++-
 server/broker/util.go                              |  22 +++--
 server/core/0_init.go                              |   8 +-
 server/core/backend/store/cacher.go                |  35 +++----
 server/core/backend/store/defer.go                 |   7 +-
 server/core/backend/store/indexer.go               |   6 +-
 server/core/backend/store/listwatch.go             |  10 +-
 server/core/backend/store/store.go                 |  16 +++-
 server/infra/registry/registry.go                  |   4 +-
 .../infra/registry/embededetcd/embededetcd.go      |  84 +++++++++--------
 .../plugin/infra/tracing/buildin/file_collector.go | 101 ++++++++++++---------
 .../infra/tracing/buildin/file_collector_test.go   |  21 ++---
 server/plugin/infra/tracing/buildin/span.go        |   4 +-
 server/plugin/infra/tracing/buildin/span_test.go   |   8 ++
 server/server.go                                   |  14 +--
 server/service/event/dependency_event_handler.go   |   4 +-
 server/service/instances.go                        |  35 +++----
 server/service/microservices.go                    |  53 ++++++-----
 server/service/notification/listwatcher.go         |   9 +-
 .../service/notification/notification_service.go   |  86 +++++++++++-------
 server/service/notification/watch_util.go          |  39 ++++----
 server/service/util/dependency.go                  |  10 +-
 34 files changed, 491 insertions(+), 398 deletions(-)

diff --git a/etc/conf/app.conf b/etc/conf/app.conf
index 02786d9..d468dbe 100644
--- a/etc/conf/app.conf
+++ b/etc/conf/app.conf
@@ -109,6 +109,12 @@ log_format = text
 log_sys = false
 
 ###################################################################
+# Frontend Configurations
+###################################################################
+frontend_host_ip=127.0.0.1
+frontend_host_port=30103
+
+###################################################################
 # above is the global configurations
 # you can overide above configuration in specific env
 ###################################################################
@@ -120,9 +126,3 @@ logfile = ./service-center.log
 [dev]
 loglevel = DEBUG
 logfile = ""
-
-###################################################################
-# Frontend Configurations
-###################################################################
-frontend_host_ip=127.0.0.1
-frontend_host_port=30103
diff --git a/main.go b/main.go
index d76e34c..3d7ac97 100644
--- a/main.go
+++ b/main.go
@@ -18,8 +18,18 @@ package main
 
 // plugins
 import _ "github.com/apache/incubator-servicecomb-service-center/server/bootstrap"
-import "github.com/apache/incubator-servicecomb-service-center/server"
+import (
+	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+	"github.com/apache/incubator-servicecomb-service-center/server"
+	"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
+)
 
 func main() {
 	server.Run()
+
+	util.GoCloseAndWait()
+
+	backend.Registry().Close()
+
+	util.Logger().Warn("service center exited", nil)
 }
diff --git a/pkg/async/async_task.go b/pkg/async/async_task.go
index 14727a1..73ceb38 100644
--- a/pkg/async/async_task.go
+++ b/pkg/async/async_task.go
@@ -39,6 +39,7 @@ type scheduler struct {
 	queue      *util.UniQueue
 	latestTask AsyncTask
 	once       sync.Once
+	goroutine  *util.GoRoutine
 }
 
 func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) {
@@ -47,7 +48,7 @@ func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) {
 	}
 
 	s.once.Do(func() {
-		go s.do()
+		s.goroutine.Do(s.do)
 	})
 
 	err = s.queue.Put(ctx, task)
@@ -57,15 +58,17 @@ func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) {
 	return s.latestTask.Err()
 }
 
-func (s *scheduler) do() {
+func (s *scheduler) do(ctx context.Context) {
 	for {
 		select {
+		case <-ctx.Done():
+			return
 		case task, ok := <-s.queue.Chan():
 			if !ok {
 				return
 			}
 			at := task.(AsyncTask)
-			at.Do(context.Background())
+			at.Do(ctx)
 			s.latestTask = at
 		}
 	}
@@ -73,6 +76,15 @@ func (s *scheduler) do() {
 
 func (s *scheduler) Close() {
 	s.queue.Close()
+	s.goroutine.Close(true)
+}
+
+func newScheduler(task AsyncTask) *scheduler {
+	return &scheduler{
+		queue:      util.NewUniQueue(),
+		latestTask: task,
+		goroutine:  util.NewGo(context.Background()),
+	}
 }
 
 type AsyncTaskService struct {
@@ -99,10 +111,7 @@ func (lat *AsyncTaskService) getOrNewScheduler(task AsyncTask) (s *scheduler, is
 		s, ok = lat.schedules[key]
 		if !ok {
 			isNew = true
-			s = &scheduler{
-				queue:      util.NewUniQueue(),
-				latestTask: task,
-			}
+			s = newScheduler(task)
 			lat.schedules[key] = s
 		}
 		lat.lock.Unlock()
@@ -166,11 +175,11 @@ func (lat *AsyncTaskService) LatestHandled(key string) (AsyncTask, error) {
 	return s.latestTask, nil
 }
 
-func (lat *AsyncTaskService) daemon(stopCh <-chan struct{}) {
+func (lat *AsyncTaskService) daemon(ctx context.Context) {
 	util.SafeCloseChan(lat.ready)
 	for {
 		select {
-		case <-stopCh:
+		case <-ctx.Done():
 			util.Logger().Debugf("daemon thread exited for AsyncTaskService is stopped")
 			return
 		case <-time.After(DEFAULT_REMOVE_TASKS_INTERVAL):
@@ -228,7 +237,7 @@ func NewAsyncTaskService() *AsyncTaskService {
 	return &AsyncTaskService{
 		schedules:   make(map[string]*scheduler, DEFAULT_MAX_SCHEDULE_COUNT),
 		removeTasks: make(map[string]struct{}, DEFAULT_MAX_SCHEDULE_COUNT),
-		goroutine:   util.NewGo(make(chan struct{})),
+		goroutine:   util.NewGo(context.Background()),
 		ready:       make(chan struct{}),
 		isClose:     true,
 	}
diff --git a/pkg/chain/callback.go b/pkg/chain/callback.go
index 9b2e7fa..3657740 100644
--- a/pkg/chain/callback.go
+++ b/pkg/chain/callback.go
@@ -47,11 +47,7 @@ func (cb *Callback) Invoke(r Result) {
 }
 
 func syncInvoke(f func(r Result), r Result) {
-	defer func() {
-		if itf := recover(); itf != nil {
-			util.LogPanic(itf)
-		}
-	}()
+	defer util.RecoverAndReport()
 	if f == nil {
 		util.Logger().Errorf(nil, "Callback function is nil. result: %s,", r)
 		return
diff --git a/pkg/etcdsync/mutex.go b/pkg/etcdsync/mutex.go
index f54ef6f..d1a35b9 100644
--- a/pkg/etcdsync/mutex.go
+++ b/pkg/etcdsync/mutex.go
@@ -134,7 +134,8 @@ func (m *DLock) Lock(wait bool) error {
 		util.Logger().Warnf(err, "Key %s is locked, waiting for other node releases it, id=%s", m.builder.key, m.id)
 
 		ctx, cancel := context.WithTimeout(m.builder.ctx, DEFAULT_LOCK_TTL*time.Second)
-		go func() {
+		util.Go(func(context.Context) {
+			defer cancel()
 			err := backend.Registry().Watch(ctx,
 				registry.WithStrKey(m.builder.key),
 				registry.WithWatchCallback(
@@ -146,10 +147,9 @@ func (m *DLock) Lock(wait bool) error {
 						return nil
 					}))
 			if err != nil {
-				util.Logger().Errorf(nil, "%s, key=%s, id=%s", err.Error(), m.builder.key, m.id)
+				util.Logger().Warnf(nil, "%s, key=%s, id=%s", err.Error(), m.builder.key, m.id)
 			}
-			cancel()
-		}()
+		})
 		select {
 		case <-ctx.Done():
 			continue // 可以重新尝试获取锁
diff --git a/pkg/grace/grace.go b/pkg/grace/grace.go
index fef6caf..3ec7cd3 100644
--- a/pkg/grace/grace.go
+++ b/pkg/grace/grace.go
@@ -20,6 +20,7 @@ import (
 	"flag"
 	"fmt"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+	"golang.org/x/net/context"
 	"os"
 	"os/exec"
 	"os/signal"
@@ -71,7 +72,7 @@ func Init() {
 		flag.Parse()
 	}
 
-	go handleSignals()
+	util.Go(handleSignals)
 }
 
 func Before(f func()) {
@@ -111,26 +112,28 @@ func fireSignalHook(ppFlag int, sig os.Signal) {
 	}
 }
 
-func handleSignals() {
+func handleSignals(ctx context.Context) {
 	var sig os.Signal
 
 	sigCh := make(chan os.Signal)
 	signal.Notify(sigCh, registerSignals...)
 
 	for {
-		sig = <-sigCh
-		fireSignalHook(PreSignal, sig)
-		switch sig {
-		case syscall.SIGHUP:
-			util.Logger().Debugf("received signal 'SIGHUP', now forking")
-			err := fork()
-			if err != nil {
-				util.Logger().Errorf(err, "fork a process failed")
+		select {
+		case <-ctx.Done():
+			return
+		case sig = <-sigCh:
+			fireSignalHook(PreSignal, sig)
+			switch sig {
+			case syscall.SIGHUP:
+				util.Logger().Debugf("received signal '%v', now forking", sig)
+				err := fork()
+				if err != nil {
+					util.Logger().Errorf(err, "fork a process failed")
+				}
 			}
-		default:
-			util.Logger().Warnf(nil, "received signal '%v'", sig)
+			fireSignalHook(PostSignal, sig)
 		}
-		fireSignalHook(PostSignal, sig)
 	}
 }
 
diff --git a/pkg/util/goroutines.go b/pkg/util/goroutines.go
index a021f52..bb9b9d9 100644
--- a/pkg/util/goroutines.go
+++ b/pkg/util/goroutines.go
@@ -16,42 +16,37 @@
  */
 package util
 
-import "sync"
+import (
+	"golang.org/x/net/context"
+	"sync"
+)
 
 type GoRoutine struct {
-	stopCh chan struct{}
+	ctx    context.Context
+	cancel context.CancelFunc
 	wg     sync.WaitGroup
 	mux    sync.RWMutex
-	once   sync.Once
 	closed bool
 }
 
-func (g *GoRoutine) Init(stopCh chan struct{}) {
-	g.once.Do(func() {
-		g.stopCh = stopCh
-	})
-}
-
-func (g *GoRoutine) StopCh() <-chan struct{} {
-	return g.stopCh
-}
-
-func (g *GoRoutine) Do(f func(<-chan struct{})) {
+func (g *GoRoutine) Do(f func(context.Context)) {
 	g.wg.Add(1)
 	go func() {
 		defer g.wg.Done()
-		f(g.StopCh())
+		defer RecoverAndReport()
+		f(g.ctx)
 	}()
 }
 
 func (g *GoRoutine) Close(wait bool) {
 	g.mux.Lock()
 	defer g.mux.Unlock()
+
 	if g.closed {
 		return
 	}
 	g.closed = true
-	close(g.stopCh)
+	g.cancel()
 	if wait {
 		g.Wait()
 	}
@@ -61,27 +56,26 @@ func (g *GoRoutine) Wait() {
 	g.wg.Wait()
 }
 
-var defaultGo GoRoutine
+var defaultGo *GoRoutine
 
 func init() {
-	GoInit()
+	defaultGo = NewGo(context.Background())
 }
 
-func Go(f func(<-chan struct{})) {
+func Go(f func(context.Context)) {
 	defaultGo.Do(f)
 }
 
-func GoInit() {
-	defaultGo.Init(make(chan struct{}))
-}
-
 func GoCloseAndWait() {
 	defaultGo.Close(true)
-	Logger().Debugf("all goroutines quit normally")
+	Logger().Debugf("all goroutines exited")
 }
 
-func NewGo(stopCh chan struct{}) *GoRoutine {
-	gr := &GoRoutine{}
-	gr.Init(stopCh)
+func NewGo(ctx context.Context) *GoRoutine {
+	ctx, cancel := context.WithCancel(ctx)
+	gr := &GoRoutine{
+		ctx:    ctx,
+		cancel: cancel,
+	}
 	return gr
 }
diff --git a/pkg/util/goroutines_test.go b/pkg/util/goroutines_test.go
index cfc0919..d808814 100644
--- a/pkg/util/goroutines_test.go
+++ b/pkg/util/goroutines_test.go
@@ -18,72 +18,66 @@ package util
 
 import (
 	"fmt"
+	"golang.org/x/net/context"
 	"sync"
 	"testing"
 	"time"
 )
 
-func TestGoRoutine_Init(t *testing.T) {
-	var test GoRoutine
-	stopCh1 := make(chan struct{})
-	defer close(stopCh1)
-	stopCh2 := make(chan struct{})
-	defer close(stopCh2)
-
-	test.Init(stopCh1)
-	c := test.StopCh()
-	if c != stopCh1 {
-		fail(t, "init GoRoutine failed.")
-	}
-
-	test.Init(stopCh2)
-	c = test.StopCh()
-	if c == stopCh2 {
-		fail(t, "init GoRoutine twice.")
-	}
-}
-
 func TestGoRoutine_Do(t *testing.T) {
-	var test1 GoRoutine
-	stopCh := make(chan struct{})
-	test1.Init(make(chan struct{}))
-	test1.Do(func(neverStopCh <-chan struct{}) {
-		defer close(stopCh)
+	test1 := NewGo(context.Background())
+	defer test1.Close(true)
+	stopCh1 := make(chan struct{})
+	test1.Do(func(ctx context.Context) {
+		defer close(stopCh1)
 		select {
-		case <-neverStopCh:
-			fail(t, "neverStopCh should not be closed.")
+		case <-ctx.Done():
+			fail(t, "ctx should not be done.")
 		case <-time.After(time.Second):
 		}
 	})
-	<-stopCh
+	<-stopCh1
 
-	var test2 GoRoutine
-	stopCh1 := make(chan struct{})
+	ctx, cancel := context.WithCancel(context.Background())
+	test2 := NewGo(ctx)
+	defer test2.Close(true)
 	stopCh2 := make(chan struct{})
-	test2.Init(stopCh1)
-	test2.Do(func(stopCh <-chan struct{}) {
+	test2.Do(func(ctx context.Context) {
 		defer close(stopCh2)
 		select {
-		case <-stopCh:
+		case <-ctx.Done():
 		case <-time.After(time.Second):
-			fail(t, "time out to wait stopCh1 close.")
+			fail(t, "time out to wait stopCh2 close.")
 		}
 	})
-	close(stopCh1)
+	cancel()
 	<-stopCh2
+
+	ctx, _ = context.WithTimeout(context.Background(), 0)
+	test3 := NewGo(ctx)
+	defer test3.Close(true)
+	stopCh3 := make(chan struct{})
+	test3.Do(func(ctx context.Context) {
+		defer close(stopCh3)
+		select {
+		case <-ctx.Done():
+		case <-time.After(time.Second):
+			fail(t, "time out to wait ctx done.")
+		}
+	})
+	<-stopCh3
 }
 
 func TestGoRoutine_Wait(t *testing.T) {
-	var test GoRoutine
 	var mux sync.Mutex
 	MAX := 10
 	resultArr := make([]int, 0, MAX)
-	test.Init(make(chan struct{}))
+	test := NewGo(context.Background())
 	for i := 0; i < MAX; i++ {
 		func(i int) {
-			test.Do(func(neverStopCh <-chan struct{}) {
+			test.Do(func(ctx context.Context) {
 				select {
-				case <-neverStopCh:
+				case <-ctx.Done():
 				case <-time.After(time.Second):
 					mux.Lock()
 					resultArr = append(resultArr, i)
@@ -103,13 +97,12 @@ func TestGoRoutine_Wait(t *testing.T) {
 }
 
 func TestGoRoutine_Close(t *testing.T) {
-	var test GoRoutine
-	test.Init(make(chan struct{}))
-	test.Do(func(stopCh <-chan struct{}) {
+	test := NewGo(context.Background())
+	test.Do(func(ctx context.Context) {
 		select {
-		case <-stopCh:
+		case <-ctx.Done():
 		case <-time.After(time.Second):
-			fail(t, "time out to wait stopCh close.")
+			fail(t, "time out to wait ctx close.")
 		}
 	})
 	test.Close(true)
@@ -117,20 +110,18 @@ func TestGoRoutine_Close(t *testing.T) {
 }
 
 func TestGo(t *testing.T) {
-	GoInit()
-	Go(func(stopCh <-chan struct{}) {
+	Go(func(ctx context.Context) {
 		for {
 			select {
-			case <-stopCh:
+			case <-ctx.Done():
 				return
 			case <-time.After(time.Second):
 			}
 		}
 	})
+	Go(func(ctx context.Context) {
+		var a *int
+		fmt.Println(*a)
+	})
 	GoCloseAndWait()
 }
-
-func TestNewGo(t *testing.T) {
-	g := NewGo(make(chan struct{}))
-	defer g.Close(true)
-}
diff --git a/pkg/util/log.go b/pkg/util/log.go
index 5e475a7..bc82a89 100644
--- a/pkg/util/log.go
+++ b/pkg/util/log.go
@@ -20,6 +20,7 @@ import (
 	"fmt"
 	"github.com/ServiceComb/paas-lager"
 	"github.com/ServiceComb/paas-lager/third_party/forked/cloudfoundry/lager"
+	"golang.org/x/net/context"
 	"os"
 	"path/filepath"
 	"runtime"
@@ -57,7 +58,7 @@ func init() {
 	loggers = make(map[string]lager.Logger, 10)
 	loggerNames = make(map[string]string, 10)
 	// make LOGGER do not be nil, new a stdout logger
-	LOGGER = newLogger(fromLagerConfig(defaultLagerConfig))
+	LOGGER = NewLogger(fromLagerConfig(defaultLagerConfig))
 }
 
 func fromLagerConfig(c *stlager.Config) LoggerConfig {
@@ -82,7 +83,7 @@ func toLagerConfig(c LoggerConfig) stlager.Config {
 }
 
 // newLog new log, unsafe
-func newLogger(cfg LoggerConfig) lager.Logger {
+func NewLogger(cfg LoggerConfig) lager.Logger {
 	stlager.Init(toLagerConfig(cfg))
 	return stlager.NewLogger(cfg.LoggerFile)
 }
@@ -93,7 +94,7 @@ func InitGlobalLogger(cfg LoggerConfig) {
 		cfg.LoggerLevel = defaultLagerConfig.LoggerLevel
 	}
 	loggerConfig = cfg
-	LOGGER = newLogger(cfg)
+	LOGGER = NewLogger(cfg)
 	// log rotate
 	RunLogDirRotate(cfg)
 	// recreate the deleted log file
@@ -144,7 +145,7 @@ func Logger() lager.Logger {
 			if len(cfg.LoggerFile) != 0 {
 				cfg.LoggerFile = filepath.Join(filepath.Dir(cfg.LoggerFile), logFile+".log")
 			}
-			logger = newLogger(cfg)
+			logger = NewLogger(cfg)
 			loggers[logFile] = logger
 			LOGGER.Warnf(nil, "match %s, new logger %s for %s", prefix, logFile, funcFullName)
 		}
@@ -190,10 +191,10 @@ func monitorLogFile() {
 	if len(loggerConfig.LoggerFile) == 0 {
 		return
 	}
-	Go(func(stopCh <-chan struct{}) {
+	Go(func(ctx context.Context) {
 		for {
 			select {
-			case <-stopCh:
+			case <-ctx.Done():
 				return
 			case <-time.After(time.Minute):
 				Logger().Debug(fmt.Sprintf("Check log file at %s", time.Now()))
diff --git a/pkg/util/logrotate.go b/pkg/util/logrotate.go
index db4b79f..e7dbcef 100644
--- a/pkg/util/logrotate.go
+++ b/pkg/util/logrotate.go
@@ -19,6 +19,7 @@ package util
 import (
 	"archive/zip"
 	"fmt"
+	"golang.org/x/net/context"
 	"io"
 	"os"
 	"path/filepath"
@@ -293,10 +294,10 @@ func CopyFile(srcFile, destFile string) error {
 }
 
 func RunLogDirRotate(cfg LoggerConfig) {
-	Go(func(stopCh <-chan struct{}) {
+	Go(func(ctx context.Context) {
 		for {
 			select {
-			case <-stopCh:
+			case <-ctx.Done():
 				return
 			case <-time.After(cfg.LogRotatePeriod):
 				LogRotate(filepath.Dir(cfg.LoggerFile), cfg.LogRotateSize, cfg.LogBackupCount)
diff --git a/server/api.go b/server/api.go
index 26ac55a..f434bdc 100644
--- a/server/api.go
+++ b/server/api.go
@@ -36,8 +36,9 @@ func init() {
 	InitAPI()
 
 	apiServer = &APIServer{
-		isClose: true,
-		err:     make(chan error, 1),
+		isClose:   true,
+		err:       make(chan error, 1),
+		goroutine: util.NewGo(context.Background()),
 	}
 }
 
@@ -66,6 +67,7 @@ type APIServer struct {
 	isClose   bool
 	forked    bool
 	err       chan error
+	goroutine *util.GoRoutine
 }
 
 const (
@@ -176,16 +178,18 @@ func (s *APIServer) doAPIServerHeartBeat(pCtx context.Context) {
 }
 
 func (s *APIServer) startHeartBeatService() {
-	go func() {
+	s.goroutine.Do(func(ctx context.Context) {
 		for {
 			select {
+			case <-ctx.Done():
+				return
 			case <-s.err:
 				return
 			case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
 				s.doAPIServerHeartBeat(context.Background())
 			}
 		}
-	}()
+	})
 }
 
 func (s *APIServer) graceDone() {
@@ -211,14 +215,14 @@ func (s *APIServer) startRESTServer() (err error) {
 	}
 	util.Logger().Infof("Local listen address: %s, host: %s.", ep, s.HostName)
 
-	go func() {
+	s.goroutine.Do(func(_ context.Context) {
 		err := s.restSrv.Serve()
 		if s.isClose {
 			return
 		}
 		util.Logger().Errorf(err, "error to start REST API server %s", ep)
 		s.err <- err
-	}()
+	})
 	return
 }
 
@@ -234,14 +238,14 @@ func (s *APIServer) startRPCServer() (err error) {
 	}
 	util.Logger().Infof("Local listen address: %s, host: %s.", ep, s.HostName)
 
-	go func() {
+	s.goroutine.Do(func(_ context.Context) {
 		err := s.rpcSrv.Serve()
 		if s.isClose {
 			return
 		}
 		util.Logger().Errorf(err, "error to start RPC API server %s", ep)
 		s.err <- err
-	}()
+	})
 	return
 }
 
@@ -301,6 +305,8 @@ func (s *APIServer) Stop() {
 
 	close(s.err)
 
+	s.goroutine.Close(true)
+
 	util.Logger().Info("api server stopped.")
 }
 
diff --git a/server/broker/service.go b/server/broker/service.go
index bde8f3e..3ae63c6 100644
--- a/server/broker/service.go
+++ b/server/broker/service.go
@@ -34,7 +34,7 @@ import (
 	"golang.org/x/net/context"
 )
 
-var BrokerServiceAPI *BrokerService = &BrokerService{}
+var BrokerServiceAPI = &BrokerService{}
 
 type BrokerService struct {
 }
diff --git a/server/broker/store.go b/server/broker/store.go
index 0cdddc5..95c61a4 100644
--- a/server/broker/store.go
+++ b/server/broker/store.go
@@ -21,6 +21,7 @@ import (
 
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	sstore "github.com/apache/incubator-servicecomb-service-center/server/core/backend/store"
+	"golang.org/x/net/context"
 )
 
 const (
@@ -72,12 +73,16 @@ func (s *BKvStore) newStore(t sstore.StoreType, opts ...sstore.KvCacherCfgOption
 	s.newIndexer(t, sstore.NewKvCacher(opts...))
 }
 
-func (s *BKvStore) store() {
+func (s *BKvStore) store(ctx context.Context) {
 	for t := sstore.StoreType(0); t != typeEnd; t++ {
 		s.newStore(t)
 	}
 	for _, i := range s.bindexers {
-		<-i.Ready()
+		select {
+		case <-ctx.Done():
+			return
+		case <-i.Ready():
+		}
 	}
 	util.SafeCloseChan(s.bready)
 
@@ -120,7 +125,13 @@ func (s *BKvStore) newIndexer(t sstore.StoreType, cacher sstore.Cacher) {
 }
 
 func (s *BKvStore) Run() {
-	go s.store()
+	util.Go(func(ctx context.Context) {
+		s.store(ctx)
+		select {
+		case <-ctx.Done():
+			s.Stop()
+		}
+	})
 }
 
 func (s *BKvStore) Ready() <-chan struct{} {
@@ -154,3 +165,18 @@ func (s *BKvStore) Verification() *sstore.Indexer {
 func (s *BKvStore) PactLatest() *sstore.Indexer {
 	return s.bindexers[PACT_LATEST]
 }
+
+func (s *BKvStore) Stop() {
+	if s.bisClose {
+		return
+	}
+	s.bisClose = true
+
+	for _, i := range s.bindexers {
+		i.Stop()
+	}
+
+	util.SafeCloseChan(s.bready)
+
+	util.Logger().Debugf("broker store daemon stopped")
+}
diff --git a/server/broker/util.go b/server/broker/util.go
index 23e98ca..895ee8c 100644
--- a/server/broker/util.go
+++ b/server/broker/util.go
@@ -25,14 +25,16 @@ import (
 	"strconv"
 	"strings"
 
-	"github.com/ServiceComb/paas-lager"
 	"github.com/ServiceComb/paas-lager/third_party/forked/cloudfoundry/lager"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
-	backend "github.com/apache/incubator-servicecomb-service-center/server/core/backend"
+	"github.com/apache/incubator-servicecomb-service-center/server/core"
+	"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
 	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
 	scerr "github.com/apache/incubator-servicecomb-service-center/server/error"
 	"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
 	serviceUtil "github.com/apache/incubator-servicecomb-service-center/server/service/util"
+	"path/filepath"
+	"time"
 )
 
 var PactLogger lager.Logger
@@ -88,12 +90,18 @@ var brokerAPILinksTitles = map[string]string{
 
 func init() {
 	//define Broker logger
-	stlager.Init(stlager.Config{
-		LoggerLevel:   "INFO",
-		LoggerFile:    "broker_srvc.log",
-		EnableRsyslog: false,
+	name := ""
+	if len(core.ServerInfo.Config.LogFilePath) != 0 {
+		name = filepath.Join(filepath.Dir(core.ServerInfo.Config.LogFilePath), "broker_srvc.log")
+	}
+	PactLogger = util.NewLogger(util.LoggerConfig{
+		LoggerLevel:     core.ServerInfo.Config.LogLevel,
+		LoggerFile:      name,
+		LogFormatText:   core.ServerInfo.Config.LogFormat == "text",
+		LogRotatePeriod: 30 * time.Second,
+		LogRotateSize:   int(core.ServerInfo.Config.LogRotateSize),
+		LogBackupCount:  int(core.ServerInfo.Config.LogBackupCount),
 	})
-	PactLogger = stlager.NewLogger("broker_srvc")
 }
 
 func GetDefaultTenantProject() string {
diff --git a/server/core/0_init.go b/server/core/0_init.go
index 1b88849..8749af6 100644
--- a/server/core/0_init.go
+++ b/server/core/0_init.go
@@ -87,7 +87,6 @@ func initLogger() {
 }
 
 func handleSignals() {
-	var sig os.Signal
 	sigCh := make(chan os.Signal)
 	signal.Notify(sigCh,
 		syscall.SIGINT,
@@ -95,13 +94,14 @@ func handleSignals() {
 		syscall.SIGTERM,
 	)
 	wait := 5 * time.Second
-	for {
-		sig = <-sigCh
+	for sig := range sigCh {
 		switch sig {
 		case syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM:
 			<-time.After(wait)
-			util.Logger().Warnf(nil, "Waiting for server response timed out(%s), force shutdown.", wait)
+			util.Logger().Warnf(nil, "waiting for server response timed out(%s), force shutdown", wait)
 			os.Exit(1)
+		default:
+			util.Logger().Warnf(nil, "received signal '%v'", sig)
 		}
 	}
 }
diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go
index c6ade97..39ba733 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -169,12 +169,12 @@ type KvCacher struct {
 	lastRev         int64
 	noEventInterval int
 
-	ready   chan struct{}
-	lw      ListWatcher
-	mux     sync.Mutex
-	once    sync.Once
-	cache   *KvCache
-	goroute *util.GoRoutine
+	ready     chan struct{}
+	lw        ListWatcher
+	mux       sync.Mutex
+	once      sync.Once
+	cache     *KvCache
+	goroutine *util.GoRoutine
 }
 
 func (c *KvCacher) needList() bool {
@@ -267,23 +267,18 @@ func (c *KvCacher) needDeferHandle(evts []*Event) bool {
 	return c.Cfg.DeferHandler.OnCondition(c.Cache(), evts)
 }
 
-func (c *KvCacher) refresh(stopCh <-chan struct{}) {
+func (c *KvCacher) refresh(ctx context.Context) {
 	util.Logger().Debugf("start to list and watch %s", c.Cfg)
-	ctx, cancel := context.WithCancel(context.Background())
-	c.goroute.Do(func(stopCh <-chan struct{}) {
-		defer cancel()
-		<-stopCh
-	})
 	for {
 		start := time.Now()
 		c.ListAndWatch(ctx)
 		watchDuration := time.Since(start)
-		nextPeriod := 0 * time.Second
+		nextPeriod := c.Cfg.Period
 		if watchDuration > 0 && c.Cfg.Period > watchDuration {
 			nextPeriod = c.Cfg.Period - watchDuration
 		}
 		select {
-		case <-stopCh:
+		case <-ctx.Done():
 			util.Logger().Debugf("stop to list and watch %s", c.Cfg)
 			return
 		case <-time.After(nextPeriod):
@@ -291,7 +286,7 @@ func (c *KvCacher) refresh(stopCh <-chan struct{}) {
 	}
 }
 
-func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
+func (c *KvCacher) deferHandle(ctx context.Context) {
 	if c.Cfg.DeferHandler == nil {
 		return
 	}
@@ -299,7 +294,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
 	i, evts := 0, make([]*Event, event_block_size)
 	for {
 		select {
-		case <-stopCh:
+		case <-ctx.Done():
 			return
 		case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
 			if !ok {
@@ -524,8 +519,8 @@ func (c *KvCacher) onKvEvents(evts []*KvEvent) {
 }
 
 func (c *KvCacher) run() {
-	c.goroute.Do(c.refresh)
-	c.goroute.Do(c.deferHandle)
+	c.goroutine.Do(c.refresh)
+	c.goroutine.Do(c.deferHandle)
 }
 
 func (c *KvCacher) Cache() Cache {
@@ -537,7 +532,7 @@ func (c *KvCacher) Run() {
 }
 
 func (c *KvCacher) Stop() {
-	c.goroute.Close(true)
+	c.goroutine.Close(true)
 
 	util.SafeCloseChan(c.ready)
 }
@@ -577,7 +572,7 @@ func NewKvCacher(opts ...KvCacherCfgOption) *KvCacher {
 			Client: backend.Registry(),
 			Key:    cfg.Key,
 		},
-		goroute: util.NewGo(make(chan struct{})),
+		goroutine: util.NewGo(context.Background()),
 	}
 	cacher.cache = NewKvCache(cacher, cfg.InitSize)
 	return cacher
diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go
index d35f973..43464f0 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -21,6 +21,7 @@ import (
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
 	"github.com/coreos/etcd/mvcc/mvccpb"
+	"golang.org/x/net/context"
 	"sync"
 	"time"
 )
@@ -98,12 +99,12 @@ func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
 	return iedh.deferCh
 }
 
-func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) {
+func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
 	defer util.RecoverAndReport()
 	t, n := iedh.newTimer(), false
 	for {
 		select {
-		case <-stopCh:
+		case <-ctx.Done():
 			return
 		case evts := <-iedh.pendingCh:
 			for _, evt := range evts {
@@ -117,7 +118,7 @@ func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) {
 			}
 
 			total := iedh.cache.Size()
-			if !iedh.enabled && del > 0 && total > 0 && float64(del) >= float64(total)*iedh.Percent {
+			if !iedh.enabled && del > 0 && total > 5 && float64(del) >= float64(total)*iedh.Percent {
 				iedh.enabled = true
 				util.Logger().Warnf(nil, "self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events",
 					del, total, iedh.Percent*100)
diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go
index 7236050..5cf3e6d 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -186,11 +186,11 @@ func (i *Indexer) OnCacheEvent(evt *KvEvent) {
 }
 
 func (i *Indexer) buildIndex() {
-	i.goroutine.Do(func(stopCh <-chan struct{}) {
+	i.goroutine.Do(func(ctx context.Context) {
 		util.SafeCloseChan(i.ready)
 		for {
 			select {
-			case <-stopCh:
+			case <-ctx.Done():
 				return
 			case evt, ok := <-i.prefixBuildQueue:
 				if !ok {
@@ -317,7 +317,7 @@ func NewCacheIndexer(t StoreType, cr Cacher) *Indexer {
 		cacheType:        t,
 		prefixIndex:      make(map[string]map[string]struct{}, DEFAULT_MAX_EVENT_COUNT),
 		prefixBuildQueue: make(chan *KvEvent, DEFAULT_MAX_EVENT_COUNT),
-		goroutine:        util.NewGo(make(chan struct{})),
+		goroutine:        util.NewGo(context.Background()),
 		ready:            make(chan struct{}),
 		isClose:          true,
 	}
diff --git a/server/core/backend/store/listwatch.go b/server/core/backend/store/listwatch.go
index 189f909..4ffb9d4 100644
--- a/server/core/backend/store/listwatch.go
+++ b/server/core/backend/store/listwatch.go
@@ -131,17 +131,17 @@ func (w *Watcher) EventBus() <-chan []*Event {
 	return w.bus
 }
 
-func (w *Watcher) process() {
+func (w *Watcher) process(_ context.Context) {
 	stopCh := make(chan struct{})
 	ctx, cancel := context.WithTimeout(w.ListOps.Context, w.ListOps.Timeout)
-	go func() {
+	util.Go(func(_ context.Context) {
 		defer close(stopCh)
 		w.lw.doWatch(ctx, w.sendEvent)
-	}()
+	})
 
 	select {
 	case <-stopCh:
-		// time out
+		// timed out or exception
 		w.Stop()
 	case <-w.stopCh:
 		cancel()
@@ -180,6 +180,6 @@ func newWatcher(lw *ListWatcher, listOps *ListOptions) *Watcher {
 		bus:     make(chan []*Event, EVENT_BUS_MAX_SIZE),
 		stopCh:  make(chan struct{}),
 	}
-	go w.process()
+	util.Go(w.process)
 	return w
 }
diff --git a/server/core/backend/store/store.go b/server/core/backend/store/store.go
index 6a5f38c..c462d41 100644
--- a/server/core/backend/store/store.go
+++ b/server/core/backend/store/store.go
@@ -109,6 +109,7 @@ type KvStore struct {
 	asyncTaskSvc *async.AsyncTaskService
 	lock         sync.RWMutex
 	ready        chan struct{}
+	goroutine    *util.GoRoutine
 	isClose      bool
 }
 
@@ -116,6 +117,7 @@ func (s *KvStore) Initialize() {
 	s.indexers = make(map[StoreType]*Indexer)
 	s.asyncTaskSvc = async.NewAsyncTaskService()
 	s.ready = make(chan struct{})
+	s.goroutine = util.NewGo(context.Background())
 
 	for i := StoreType(0); i != typeEnd; i++ {
 		store.newNullStore(i)
@@ -147,7 +149,7 @@ func (s *KvStore) newIndexer(t StoreType, cacher Cacher) {
 }
 
 func (s *KvStore) Run() {
-	go s.store()
+	s.goroutine.Do(s.store)
 	s.asyncTaskSvc.Run()
 }
 
@@ -166,7 +168,7 @@ func (s *KvStore) SelfPreservationHandler() DeferHandler {
 	return &InstanceEventDeferHandler{Percent: DEFAULT_SELF_PRESERVATION_PERCENT}
 }
 
-func (s *KvStore) store() {
+func (s *KvStore) store(ctx context.Context) {
 	for t := StoreType(0); t != typeEnd; t++ {
 		switch t {
 		case INSTANCE:
@@ -178,7 +180,11 @@ func (s *KvStore) store() {
 		}
 	}
 	for _, i := range s.indexers {
-		<-i.Ready()
+		select {
+		case <-ctx.Done():
+			return
+		case <-i.Ready():
+		}
 	}
 	util.SafeCloseChan(s.ready)
 
@@ -214,9 +220,11 @@ func (s *KvStore) Stop() {
 
 	s.asyncTaskSvc.Stop()
 
+	s.goroutine.Close(true)
+
 	util.SafeCloseChan(s.ready)
 
-	util.Logger().Debugf("store daemon stopped.")
+	util.Logger().Debugf("store daemon stopped")
 }
 
 func (s *KvStore) Ready() <-chan struct{} {
diff --git a/server/infra/registry/registry.go b/server/infra/registry/registry.go
index 575a351..0856fa3 100644
--- a/server/infra/registry/registry.go
+++ b/server/infra/registry/registry.go
@@ -144,7 +144,7 @@ const (
 )
 
 const (
-	REQUEST_TIMEOUT = 300
+	REQUEST_TIMEOUT = 30 * time.Second
 
 	DEFAULT_PAGE_COUNT = 4096 // grpc does not allow to transport a large body more then 4MB in a request.
 )
@@ -359,7 +359,7 @@ func OpCmp(opt CompareOperation, result CompareResult, v interface{}) (cmp Compa
 }
 
 func WithTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
-	return context.WithTimeout(ctx, REQUEST_TIMEOUT*time.Second)
+	return context.WithTimeout(ctx, REQUEST_TIMEOUT)
 }
 
 func RegistryConfig() *Config {
diff --git a/server/plugin/infra/registry/embededetcd/embededetcd.go b/server/plugin/infra/registry/embededetcd/embededetcd.go
index 4b941bc..3bb7140 100644
--- a/server/plugin/infra/registry/embededetcd/embededetcd.go
+++ b/server/plugin/infra/registry/embededetcd/embededetcd.go
@@ -41,16 +41,17 @@ import (
 
 var embedTLSConfig *tls.Config
 
-const START_MANAGER_SERVER_TIMEOUT = 60
+const START_MANAGER_SERVER_TIMEOUT = 10
 
 func init() {
 	mgr.RegisterPlugin(mgr.Plugin{mgr.REGISTRY, "embeded_etcd", getEmbedInstance})
 }
 
 type EtcdEmbed struct {
-	Server *embed.Etcd
-	err    chan error
-	ready  chan int
+	Embed     *embed.Etcd
+	err       chan error
+	ready     chan int
+	goroutine *util.GoRoutine
 }
 
 func (s *EtcdEmbed) Err() <-chan error {
@@ -62,9 +63,10 @@ func (s *EtcdEmbed) Ready() <-chan int {
 }
 
 func (s *EtcdEmbed) Close() {
-	if s.Server != nil {
-		s.Server.Close()
+	if s.Embed != nil {
+		s.Embed.Close()
 	}
+	s.goroutine.Close(true)
 	util.Logger().Debugf("embedded etcd client stopped.")
 }
 
@@ -232,7 +234,7 @@ func (s *EtcdEmbed) Compact(ctx context.Context, reserve int64) error {
 	}
 
 	util.Logger().Infof("Compacting... revision is %d(current: %d, reserve %d)", revToCompact, curRev, reserve)
-	_, err := s.Server.Server.Compact(ctx, &etcdserverpb.CompactionRequest{
+	_, err := s.Embed.Server.Compact(ctx, &etcdserverpb.CompactionRequest{
 		Revision: revToCompact,
 		Physical: true,
 	})
@@ -250,7 +252,7 @@ func (s *EtcdEmbed) Compact(ctx context.Context, reserve int64) error {
 }
 
 func (s *EtcdEmbed) getLeaderCurrentRevision(ctx context.Context) int64 {
-	return s.Server.Server.KV().Rev()
+	return s.Embed.Server.KV().Rev()
 }
 
 func (s *EtcdEmbed) PutNoOverride(ctx context.Context, opts ...registry.PluginOpOption) (bool, error) {
@@ -275,7 +277,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts ...registry.PluginOpOption) (*r
 	switch op.Action {
 	case registry.Get:
 		var etcdResp *etcdserverpb.RangeResponse
-		etcdResp, err = s.Server.Server.Range(otCtx, s.toGetRequest(op))
+		etcdResp, err = s.Embed.Server.Range(otCtx, s.toGetRequest(op))
 		if err != nil {
 			break
 		}
@@ -286,7 +288,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts ...registry.PluginOpOption) (*r
 		}
 	case registry.Put:
 		var etcdResp *etcdserverpb.PutResponse
-		etcdResp, err = s.Server.Server.Put(otCtx, s.toPutRequest(op))
+		etcdResp, err = s.Embed.Server.Put(otCtx, s.toPutRequest(op))
 		if err != nil {
 			break
 		}
@@ -295,7 +297,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts ...registry.PluginOpOption) (*r
 		}
 	case registry.Delete:
 		var etcdResp *etcdserverpb.DeleteRangeResponse
-		etcdResp, err = s.Server.Server.DeleteRange(otCtx, s.toDeleteRequest(op))
+		etcdResp, err = s.Embed.Server.DeleteRange(otCtx, s.toDeleteRequest(op))
 		if err != nil {
 			break
 		}
@@ -338,7 +340,7 @@ func (s *EtcdEmbed) TxnWithCmp(ctx context.Context, success []registry.PluginOp,
 	if len(etcdFailOps) > 0 {
 		txnRequest.Failure = etcdFailOps
 	}
-	resp, err := s.Server.Server.Txn(otCtx, txnRequest)
+	resp, err := s.Embed.Server.Txn(otCtx, txnRequest)
 	if err != nil {
 		return nil, err
 	}
@@ -351,7 +353,7 @@ func (s *EtcdEmbed) TxnWithCmp(ctx context.Context, success []registry.PluginOp,
 func (s *EtcdEmbed) LeaseGrant(ctx context.Context, TTL int64) (int64, error) {
 	otCtx, cancel := registry.WithTimeout(ctx)
 	defer cancel()
-	etcdResp, err := s.Server.Server.LeaseGrant(otCtx, &etcdserverpb.LeaseGrantRequest{
+	etcdResp, err := s.Embed.Server.LeaseGrant(otCtx, &etcdserverpb.LeaseGrantRequest{
 		TTL: TTL,
 	})
 	if err != nil {
@@ -363,7 +365,7 @@ func (s *EtcdEmbed) LeaseGrant(ctx context.Context, TTL int64) (int64, error) {
 func (s *EtcdEmbed) LeaseRenew(ctx context.Context, leaseID int64) (int64, error) {
 	otCtx, cancel := registry.WithTimeout(ctx)
 	defer cancel()
-	ttl, err := s.Server.Server.LeaseRenew(otCtx, lease.LeaseID(leaseID))
+	ttl, err := s.Embed.Server.LeaseRenew(otCtx, lease.LeaseID(leaseID))
 	if err != nil {
 		if err.Error() == grpc.ErrorDesc(rpctypes.ErrGRPCLeaseNotFound) {
 			return 0, err
@@ -376,7 +378,7 @@ func (s *EtcdEmbed) LeaseRenew(ctx context.Context, leaseID int64) (int64, error
 func (s *EtcdEmbed) LeaseRevoke(ctx context.Context, leaseID int64) error {
 	otCtx, cancel := registry.WithTimeout(ctx)
 	defer cancel()
-	_, err := s.Server.Server.LeaseRevoke(otCtx, &etcdserverpb.LeaseRevokeRequest{
+	_, err := s.Embed.Server.LeaseRevoke(otCtx, &etcdserverpb.LeaseRevokeRequest{
 		ID: leaseID,
 	})
 	if err != nil {
@@ -392,7 +394,7 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts ...registry.PluginOpOption)
 	op := registry.OpGet(opts...)
 
 	if len(op.Key) > 0 {
-		watchable := s.Server.Server.Watchable()
+		watchable := s.Embed.Server.Watchable()
 		ws := watchable.NewWatchStream()
 		defer ws.Close()
 
@@ -455,6 +457,29 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts ...registry.PluginOpOption)
 	return
 }
 
+func (s *EtcdEmbed) ReadyNotify() {
+	timeout := START_MANAGER_SERVER_TIMEOUT * time.Second
+	select {
+	case <-s.Embed.Server.ReadyNotify():
+		close(s.ready)
+		s.goroutine.Do(func(ctx context.Context) {
+			select {
+			case <-ctx.Done():
+				return
+			case err := <-s.Embed.Err():
+				s.err <- err
+			}
+		})
+	case <-time.After(timeout):
+		err := fmt.Errorf("timed out(%s)", timeout)
+		util.Logger().Errorf(err, "read notify failed")
+
+		s.Embed.Server.Stop()
+
+		s.err <- err
+	}
+}
+
 func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *mvccpb.Event) registry.ActionType {
 	switch evt.Type {
 	case mvccpb.DELETE:
@@ -488,8 +513,9 @@ func getEmbedInstance() mgr.PluginInstance {
 	addrs := beego.AppConfig.DefaultString("manager_addr", "http://127.0.0.1:2380")
 
 	inst := &EtcdEmbed{
-		err:   make(chan error, 1),
-		ready: make(chan int),
+		err:       make(chan error, 1),
+		ready:     make(chan int),
+		goroutine: util.NewGo(context.Background()),
 	}
 
 	if core.ServerInfo.Config.SslEnabled {
@@ -537,30 +563,14 @@ func getEmbedInstance() mgr.PluginInstance {
 		inst.err <- err
 		return inst
 	}
-	inst.Server = etcd
-
-	select {
-	case <-etcd.Server.ReadyNotify():
-		close(inst.ready)
-		go func() {
-			select {
-			case err := <-etcd.Err():
-				inst.err <- err
-			}
-		}()
-	case <-time.After(START_MANAGER_SERVER_TIMEOUT * time.Second):
-		message := "etcd server took too long to start"
-		util.Logger().Error(message, nil)
+	inst.Embed = etcd
 
-		etcd.Server.Stop()
-
-		inst.err <- errors.New(message)
-	}
+	inst.ReadyNotify()
 	return inst
 }
 
 func parseURL(addrs string) ([]url.URL, error) {
-	urls := []url.URL{}
+	var urls []url.URL
 	ips := strings.Split(addrs, ",")
 	for _, ip := range ips {
 		addr, err := url.Parse(ip)
diff --git a/server/plugin/infra/tracing/buildin/file_collector.go b/server/plugin/infra/tracing/buildin/file_collector.go
index bd48e5b..851b3fc 100644
--- a/server/plugin/infra/tracing/buildin/file_collector.go
+++ b/server/plugin/infra/tracing/buildin/file_collector.go
@@ -23,15 +23,19 @@ import (
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	"github.com/apache/incubator-servicecomb-service-center/server/core"
 	"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+	"golang.org/x/net/context"
 	"os"
+	"strings"
 	"time"
 )
 
 type FileCollector struct {
 	Fd        *os.File
+	Timeout   time.Duration
 	Interval  time.Duration
 	BatchSize int
 	c         chan *zipkincore.Span
+	goroutine *util.GoRoutine
 }
 
 func (f *FileCollector) Collect(span *zipkincore.Span) error {
@@ -39,11 +43,16 @@ func (f *FileCollector) Collect(span *zipkincore.Span) error {
 		return fmt.Errorf("required FD to write")
 	}
 
-	f.c <- span
+	select {
+	case f.c <- span:
+	case <-time.After(f.Timeout):
+		util.Logger().Errorf(nil, "send span to handle channel timed out(%s)", f.Timeout)
+	}
 	return nil
 }
 
 func (f *FileCollector) Close() error {
+	f.goroutine.Close(true)
 	return f.Fd.Close()
 }
 
@@ -77,7 +86,7 @@ func (f *FileCollector) write(batch []*zipkincore.Span) (c int) {
 }
 
 func (f *FileCollector) checkFile() error {
-	if util.PathExist(f.Fd.Name()) {
+	if util.PathExist(f.Fd.Name()) || strings.Index(f.Fd.Name(), "/dev/") == 0 {
 		return nil
 	}
 
@@ -100,52 +109,54 @@ func (f *FileCollector) checkFile() error {
 	return nil
 }
 
-func (f *FileCollector) Run(stopCh <-chan struct{}) {
-	var (
-		batch []*zipkincore.Span
-		prev  []*zipkincore.Span
-		i     = f.Interval * 10
-		t     = time.NewTicker(f.Interval)
-		nr    = time.Now().Add(i)
-		max   = f.BatchSize * 2
-	)
-	for {
-		select {
-		case <-stopCh:
-			f.write(batch)
-			return
-		case span := <-f.c:
-			batch = append(batch, span)
-			if len(batch) >= f.BatchSize {
-				if len(batch) > max {
-					dispose := len(batch) - f.BatchSize
-					util.Logger().Errorf(nil, "backlog is full, dispose %d span(s), max: %d",
-						dispose, max)
-					batch = batch[dispose:] // allocate more
-				}
-				if c := f.write(batch); c == 0 {
-					continue
+func (f *FileCollector) Run() {
+	f.goroutine.Do(func(ctx context.Context) {
+		var (
+			batch []*zipkincore.Span
+			prev  []*zipkincore.Span
+			i     = f.Interval * 10
+			t     = time.NewTicker(f.Interval)
+			nr    = time.Now().Add(i)
+			max   = f.BatchSize * 2
+		)
+		for {
+			select {
+			case <-ctx.Done():
+				f.write(batch)
+				return
+			case span := <-f.c:
+				batch = append(batch, span)
+				if len(batch) >= f.BatchSize {
+					if len(batch) > max {
+						dispose := len(batch) - f.BatchSize
+						util.Logger().Errorf(nil, "backlog is full, dispose %d span(s), max: %d",
+							dispose, max)
+						batch = batch[dispose:] // allocate more
+					}
+					if c := f.write(batch); c == 0 {
+						continue
+					}
+					if prev != nil {
+						batch, prev = prev[:0], batch
+					} else {
+						prev, batch = batch, batch[len(batch):] // new one
+					}
 				}
-				if prev != nil {
-					batch, prev = prev[:0], batch
-				} else {
-					prev, batch = batch, batch[len(batch):] // new one
+			case <-t.C:
+				if time.Now().After(nr) {
+					util.LogRotateFile(f.Fd.Name(),
+						int(core.ServerInfo.Config.LogRotateSize),
+						int(core.ServerInfo.Config.LogBackupCount),
+					)
+					nr = time.Now().Add(i)
 				}
-			}
-		case <-t.C:
-			if time.Now().After(nr) {
-				util.LogRotateFile(f.Fd.Name(),
-					int(core.ServerInfo.Config.LogRotateSize),
-					int(core.ServerInfo.Config.LogBackupCount),
-				)
-				nr = time.Now().Add(i)
-			}
 
-			if c := f.write(batch); c > 0 {
-				batch = batch[:0]
+				if c := f.write(batch); c > 0 {
+					batch = batch[:0]
+				}
 			}
 		}
-	}
+	})
 }
 
 func NewFileCollector(path string) (*FileCollector, error) {
@@ -155,10 +166,12 @@ func NewFileCollector(path string) (*FileCollector, error) {
 	}
 	fc := &FileCollector{
 		Fd:        fd,
+		Timeout:   5 * time.Second,
 		Interval:  10 * time.Second,
 		BatchSize: 100,
 		c:         make(chan *zipkincore.Span, 1000),
+		goroutine: util.NewGo(context.Background()),
 	}
-	util.Go(fc.Run)
+	fc.Run()
 	return fc, nil
 }
diff --git a/server/plugin/infra/tracing/buildin/file_collector_test.go b/server/plugin/infra/tracing/buildin/file_collector_test.go
index 075fdae..4f3e150 100644
--- a/server/plugin/infra/tracing/buildin/file_collector_test.go
+++ b/server/plugin/infra/tracing/buildin/file_collector_test.go
@@ -17,34 +17,33 @@
 package buildin
 
 import (
+	"fmt"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+	"golang.org/x/net/context"
 	"os"
 	"testing"
 	"time"
 )
 
 func TestFileCollector_Collect(t *testing.T) {
-	fileName := "./test"
-	fd, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
-	if err != nil {
-		t.FailNow()
-	}
 	fc := &FileCollector{
-		Fd:        fd,
+		Fd:        os.Stdout,
+		Timeout:   1 * time.Second,
 		Interval:  100 * time.Second,
 		BatchSize: 2,
-		c:         make(chan *zipkincore.Span, 1000),
+		c:         make(chan *zipkincore.Span, 100),
+		goroutine: util.NewGo(context.Background()),
 	}
 	defer func() {
 		fc.Close()
-		os.Remove(fileName)
 	}()
-	util.Go(fc.Run)
+	fc.Run()
 
-	for i := int64(0); i < 3; i++ {
-		err := fc.Collect(&zipkincore.Span{ParentID: &i, TraceIDHigh: &i})
+	for i := 0; i < 10; i++ {
+		err := fc.Collect(&zipkincore.Span{})
 		if err != nil {
+			fmt.Println(err)
 			t.FailNow()
 		}
 	}
diff --git a/server/plugin/infra/tracing/buildin/span.go b/server/plugin/infra/tracing/buildin/span.go
index 5b8011d..3d20088 100644
--- a/server/plugin/infra/tracing/buildin/span.go
+++ b/server/plugin/infra/tracing/buildin/span.go
@@ -61,7 +61,9 @@ type Endpoint struct {
 func (s *Span) FromZipkinSpan(span *zipkincore.Span) {
 	traceId := new(types.TraceID)
 	traceId.Low = uint64(span.TraceID)
-	traceId.High = uint64(*(span.TraceIDHigh))
+	if span.TraceIDHigh != nil {
+		traceId.High = uint64(*(span.TraceIDHigh))
+	}
 	s.TraceID = traceId.ToHex()
 	s.Duration = span.Duration
 
diff --git a/server/plugin/infra/tracing/buildin/span_test.go b/server/plugin/infra/tracing/buildin/span_test.go
index c3dceb4..069e6a9 100644
--- a/server/plugin/infra/tracing/buildin/span_test.go
+++ b/server/plugin/infra/tracing/buildin/span_test.go
@@ -158,4 +158,12 @@ func TestFromZipkinSpan(t *testing.T) {
 		t.FailNow()
 	}
 	fmt.Println(string(b))
+
+	s = FromZipkinSpan(&zipkincore.Span{})
+	b, err = json.Marshal(s)
+	if err != nil {
+		fmt.Println("TestFromZipkinSpan Marshal", err)
+		t.FailNow()
+	}
+	fmt.Println(string(b))
 }
diff --git a/server/server.go b/server/server.go
index a546879..15e0abd 100644
--- a/server/server.go
+++ b/server/server.go
@@ -43,6 +43,7 @@ func init() {
 		store:         st.Store(),
 		notifyService: nf.GetNotifyService(),
 		apiServer:     GetAPIServer(),
+		goroutine:     util.NewGo(context.Background()),
 	}
 }
 
@@ -50,6 +51,7 @@ type ServiceCenterServer struct {
 	apiServer     *APIServer
 	notifyService *nf.NotifyService
 	store         *st.KvStore
+	goroutine     *util.GoRoutine
 }
 
 func (s *ServiceCenterServer) Run() {
@@ -74,7 +76,7 @@ func (s *ServiceCenterServer) waitForQuit() {
 
 	s.Stop()
 
-	util.Logger().Warn("service center quit", nil)
+	util.Logger().Debugf("service center stopped")
 }
 
 func (s *ServiceCenterServer) needUpgrade() bool {
@@ -119,12 +121,12 @@ func (s *ServiceCenterServer) autoCompactBackend() {
 		util.Logger().Errorf(err, "invalid compact interval %s, reset to default interval 12h", core.ServerInfo.Config.CompactInterval)
 		interval = 12 * time.Hour
 	}
-	util.Go(func(stopCh <-chan struct{}) {
+	s.goroutine.Do(func(ctx context.Context) {
 		util.Logger().Infof("enabled the automatic compact mechanism, compact once every %s, reserve %d",
 			core.ServerInfo.Config.CompactInterval, delta)
 		for {
 			select {
-			case <-stopCh:
+			case <-ctx.Done():
 				return
 			case <-time.After(interval):
 				lock, err := mux.Try(mux.GLOBAL_LOCK)
@@ -133,7 +135,7 @@ func (s *ServiceCenterServer) autoCompactBackend() {
 					continue
 				}
 
-				backend.Registry().Compact(context.Background(), delta)
+				backend.Registry().Compact(ctx, delta)
 
 				lock.Unlock()
 			}
@@ -190,9 +192,7 @@ func (s *ServiceCenterServer) Stop() {
 		s.store.Stop()
 	}
 
-	util.GoCloseAndWait()
-
-	backend.Registry().Close()
+	s.goroutine.Close(true)
 }
 
 func Run() {
diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go
index 7bbe5df..6961c81 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -50,7 +50,7 @@ func (h *DependencyEventHandler) OnEvent(evt *store.KvEvent) {
 }
 
 func (h *DependencyEventHandler) loop() {
-	util.Go(func(stopCh <-chan struct{}) {
+	util.Go(func(ctx context.Context) {
 		waitDelayIndex := 0
 		waitDelay := []int{1, 1, 5, 10, 20, 30, 60}
 		retry := func() {
@@ -64,7 +64,7 @@ func (h *DependencyEventHandler) loop() {
 		}
 		for {
 			select {
-			case <-stopCh:
+			case <-ctx.Done():
 				return
 			case <-h.signals.Chan():
 				lock, err := mux.Try(mux.DEP_QUEUE_LOCK)
diff --git a/server/service/instances.go b/server/service/instances.go
index 48c7029..eb94506 100644
--- a/server/service/instances.go
+++ b/server/service/instances.go
@@ -370,19 +370,7 @@ func (s *InstanceService) HeartbeatSet(ctx context.Context, in *pb.HeartbeatSetR
 			existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId] = true
 			noMultiCounter++
 		}
-		go func(element *pb.HeartbeatSetElement) {
-			hbRst := &pb.InstanceHbRst{
-				ServiceId:  element.ServiceId,
-				InstanceId: element.InstanceId,
-				ErrMessage: "",
-			}
-			_, _, err, _ := serviceUtil.HeartbeatUtil(ctx, domainProject, element.ServiceId, element.InstanceId)
-			if err != nil {
-				hbRst.ErrMessage = err.Error()
-				util.Logger().Errorf(err, "heartbeat set failed, %s/%s", element.ServiceId, element.InstanceId)
-			}
-			instancesHbRst <- hbRst
-		}(heartbeatElement)
+		util.Go(getHeartbeatFunc(ctx, domainProject, instancesHbRst, heartbeatElement))
 	}
 	count := 0
 	successFlag := false
@@ -415,6 +403,22 @@ func (s *InstanceService) HeartbeatSet(ctx context.Context, in *pb.HeartbeatSetR
 	}
 }
 
+func getHeartbeatFunc(ctx context.Context, domainProject string, instancesHbRst chan<- *pb.InstanceHbRst, element *pb.HeartbeatSetElement) func(context.Context) {
+	return func(_ context.Context) {
+		hbRst := &pb.InstanceHbRst{
+			ServiceId:  element.ServiceId,
+			InstanceId: element.InstanceId,
+			ErrMessage: "",
+		}
+		_, _, err, _ := serviceUtil.HeartbeatUtil(ctx, domainProject, element.ServiceId, element.InstanceId)
+		if err != nil {
+			hbRst.ErrMessage = err.Error()
+			util.Logger().Errorf(err, "heartbeat set failed, %s/%s", element.ServiceId, element.InstanceId)
+		}
+		instancesHbRst <- hbRst
+	}
+}
+
 func (s *InstanceService) GetOneInstance(ctx context.Context, in *pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) {
 	checkErr := s.getInstancePreCheck(ctx, in)
 	if checkErr != nil {
@@ -723,7 +727,6 @@ func (s *InstanceService) UpdateInstanceProperties(ctx context.Context, in *pb.U
 	}, nil
 }
 
-
 func (s *InstanceService) WatchPreOpera(ctx context.Context, in *pb.WatchInstanceRequest) error {
 	if in == nil || len(in.SelfServiceId) == 0 {
 		return errors.New("Request format invalid.")
@@ -742,7 +745,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream pb.ServiceIn
 		return err
 	}
 	domainProject := util.ParseDomainProject(stream.Context())
-	watcher := nf.NewInstanceWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/")
+	watcher := nf.NewInstanceListWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/", nil)
 	err = nf.GetNotifyService().AddSubscriber(watcher)
 	util.Logger().Infof("start watch instance status, watcher %s %s", watcher.Subject(), watcher.Id())
 	return nf.HandleWatchJob(watcher, stream, nf.GetNotifyService().Config.NotifyTimeout)
@@ -754,7 +757,7 @@ func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstan
 		nf.EstablishWebSocketError(conn, err)
 		return
 	}
-	nf.DoWebSocketWatch(ctx, in.SelfServiceId, conn)
+	nf.DoWebSocketListAndWatch(ctx, in.SelfServiceId, nil, conn)
 }
 
 func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
diff --git a/server/service/microservices.go b/server/service/microservices.go
index 4fe47d2..6663ab9 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -359,11 +359,6 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De
 			nuoMultilCount++
 		}
 
-		serviceRst := &pb.DelServicesRspInfo{
-			ServiceId:  serviceId,
-			ErrMessage: "",
-		}
-
 		//检查服务ID合法性
 		in := &pb.DeleteServiceRequest{
 			ServiceId: serviceId,
@@ -372,22 +367,15 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De
 		err := apt.Validate(in)
 		if err != nil {
 			util.Logger().Errorf(err, "delete micro-service failed, serviceId is %s: invalid parameters.", in.ServiceId)
-			serviceRst.ErrMessage = err.Error()
-			serviceRespChan <- serviceRst
+			serviceRespChan <- &pb.DelServicesRspInfo{
+				ServiceId:  serviceId,
+				ErrMessage: err.Error(),
+			}
 			continue
 		}
 
 		//执行删除服务操作
-		go func(serviceItem string) {
-			resp, err := s.DeleteServicePri(ctx, serviceItem, request.Force)
-			if err != nil {
-				serviceRst.ErrMessage = err.Error()
-			} else if resp.Code != pb.Response_SUCCESS {
-				serviceRst.ErrMessage = resp.Message
-			}
-
-			serviceRespChan <- serviceRst
-		}(serviceId)
+		util.Go(s.getDeleteServiceFunc(ctx, serviceId, request.Force, serviceRespChan))
 	}
 
 	//获取批量删除服务的结果
@@ -419,6 +407,23 @@ func (s *MicroServiceService) DeleteServices(ctx context.Context, request *pb.De
 	return resp, nil
 }
 
+func (s *MicroServiceService) getDeleteServiceFunc(ctx context.Context, serviceId string, force bool, serviceRespChan chan<- *pb.DelServicesRspInfo) func(context.Context) {
+	return func(_ context.Context) {
+		serviceRst := &pb.DelServicesRspInfo{
+			ServiceId:  serviceId,
+			ErrMessage: "",
+		}
+		resp, err := s.DeleteServicePri(ctx, serviceId, force)
+		if err != nil {
+			serviceRst.ErrMessage = err.Error()
+		} else if resp.Code != pb.Response_SUCCESS {
+			serviceRst.ErrMessage = resp.Message
+		}
+
+		serviceRespChan <- serviceRst
+	}
+}
+
 func (s *MicroServiceService) GetOne(ctx context.Context, in *pb.GetServiceRequest) (*pb.GetServiceResponse, error) {
 	if in == nil || len(in.ServiceId) == 0 {
 		return &pb.GetServiceResponse{
@@ -437,7 +442,7 @@ func (s *MicroServiceService) GetOne(ctx context.Context, in *pb.GetServiceReque
 	service, err := serviceUtil.GetService(ctx, domainProject, in.ServiceId)
 
 	if err != nil {
-		util.Logger().Errorf(err, "get micro-service failed, serviceId is %s: inner err,get service failed.", in.ServiceId)
+		util.Logger().Errorf(err, "get micro-service failed, serviceId is %s: inner err, get service failed.", in.ServiceId)
 		return &pb.GetServiceResponse{
 			Response: pb.CreateResponse(scerr.ErrInternal, "Get service file failed."),
 		}, err
@@ -655,7 +660,7 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create
 	//create rules
 	if in.Rules != nil && len(in.Rules) != 0 {
 		chanLen++
-		go func() {
+		util.Go(func(_ context.Context) {
 			req := &pb.AddServiceRulesRequest{
 				ServiceId: serviceId,
 				Rules:     in.Rules,
@@ -670,12 +675,12 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create
 				chanRsp.Message = rsp.Response.Message
 			}
 			createRespChan <- chanRsp
-		}()
+		})
 	}
 	//create tags
 	if in.Tags != nil && len(in.Tags) != 0 {
 		chanLen++
-		go func() {
+		util.Go(func(_ context.Context) {
 			req := &pb.AddServiceTagsRequest{
 				ServiceId: serviceId,
 				Tags:      in.Tags,
@@ -690,12 +695,12 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create
 				chanRsp.Message = rsp.Response.Message
 			}
 			createRespChan <- chanRsp
-		}()
+		})
 	}
 	// create instance
 	if in.Instances != nil && len(in.Instances) != 0 {
 		chanLen++
-		go func() {
+		util.Go(func(_ context.Context) {
 			chanRsp := &pb.Response{}
 			for _, ins := range in.Instances {
 				req := &pb.RegisterInstanceRequest{
@@ -711,7 +716,7 @@ func (s *MicroServiceService) CreateServiceEx(ctx context.Context, in *pb.Create
 				}
 				createRespChan <- chanRsp
 			}
-		}()
+		})
 	}
 
 	// handle result
diff --git a/server/service/notification/listwatcher.go b/server/service/notification/listwatcher.go
index 8a34076..de0e2c6 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/service/notification/listwatcher.go
@@ -19,6 +19,7 @@ package notification
 import (
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+	"golang.org/x/net/context"
 	"time"
 )
 
@@ -44,10 +45,10 @@ func (w *ListWatcher) OnAccept() {
 	}
 
 	util.Logger().Debugf("accepted by notify service, %s watcher %s %s", w.Type(), w.Id(), w.Subject())
-	go w.listAndPublishJobs()
+	util.Go(w.listAndPublishJobs)
 }
 
-func (w *ListWatcher) listAndPublishJobs() {
+func (w *ListWatcher) listAndPublishJobs(_ context.Context) {
 	defer close(w.listCh)
 	if w.ListFunc == nil {
 		return
@@ -112,10 +113,6 @@ func NewWatchJob(nType NotifyType, subscriberId, subject string, rev int64, resp
 	}
 }
 
-func NewWatcher(nType NotifyType, id string, subject string) *ListWatcher {
-	return NewListWatcher(nType, id, subject, nil)
-}
-
 func NewListWatcher(nType NotifyType, id string, subject string,
 	listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
 	watcher := &ListWatcher{
diff --git a/server/service/notification/notification_service.go b/server/service/notification/notification_service.go
index e2d7864..63f78a2 100644
--- a/server/service/notification/notification_service.go
+++ b/server/service/notification/notification_service.go
@@ -20,6 +20,7 @@ import (
 	"container/list"
 	"errors"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+	"golang.org/x/net/context"
 	"sync"
 	"time"
 )
@@ -33,7 +34,8 @@ var notifyService *NotifyService
 
 func init() {
 	notifyService = &NotifyService{
-		isClose: true,
+		isClose:   true,
+		goroutine: util.NewGo(context.Background()),
 	}
 }
 
@@ -46,13 +48,14 @@ type serviceIndex map[NotifyType]subscriberSubjectIndex
 type NotifyService struct {
 	Config NotifyServiceConfig
 
-	services serviceIndex
-	queues   map[NotifyType]chan NotifyJob
-	waits    sync.WaitGroup
-	mutexes  map[NotifyType]*sync.Mutex
-	err      chan error
-	closeMux sync.RWMutex
-	isClose  bool
+	services  serviceIndex
+	queues    map[NotifyType]chan NotifyJob
+	waits     sync.WaitGroup
+	mutexes   map[NotifyType]*sync.Mutex
+	err       chan error
+	closeMux  sync.RWMutex
+	isClose   bool
+	goroutine *util.GoRoutine
 }
 
 func (s *NotifyService) Err() <-chan error {
@@ -150,41 +153,52 @@ func (s *NotifyService) AddJob(job NotifyJob) error {
 	}
 }
 
-func (s *NotifyService) publish2Subscriber(t NotifyType) {
-	defer s.waits.Done()
-	for job := range s.queues[t] {
-		util.Logger().Infof("notification service got a job %s: %s to notify subscriber %s",
-			job.Type(), job.Subject(), job.SubscriberId())
+func (s *NotifyService) getPublish2SubscriberFunc(t NotifyType) func(context.Context) {
+	return func(ctx context.Context) {
+		defer s.waits.Done()
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case job, ok := <-s.queues[t]:
+				if !ok {
+					return
+				}
 
-		s.mutexes[t].Lock()
+				util.Logger().Infof("notification service got a job %s: %s to notify subscriber %s",
+					job.Type(), job.Subject(), job.SubscriberId())
 
-		if s.Closed() && len(s.services[t]) == 0 {
-			s.mutexes[t].Unlock()
-			return
-		}
+				s.mutexes[t].Lock()
 
-		m, ok := s.services[t][job.Subject()]
-		if ok {
-			// publish的subject如果带上id,则单播,否则广播
-			if len(job.SubscriberId()) != 0 {
-				ns, ok := m[job.SubscriberId()]
+				if s.Closed() && len(s.services[t]) == 0 {
+					s.mutexes[t].Unlock()
+					return
+				}
+
+				m, ok := s.services[t][job.Subject()]
 				if ok {
-					for n := ns.Front(); n != nil; n = n.Next() {
-						n.Value.(Subscriber).OnMessage(job)
+					// publish的subject如果带上id,则单播,否则广播
+					if len(job.SubscriberId()) != 0 {
+						ns, ok := m[job.SubscriberId()]
+						if ok {
+							for n := ns.Front(); n != nil; n = n.Next() {
+								n.Value.(Subscriber).OnMessage(job)
+							}
+						}
+						s.mutexes[t].Unlock()
+						continue
+					}
+					for key := range m {
+						ns := m[key]
+						for n := ns.Front(); n != nil; n = n.Next() {
+							n.Value.(Subscriber).OnMessage(job)
+						}
 					}
 				}
+
 				s.mutexes[t].Unlock()
-				continue
-			}
-			for key := range m {
-				ns := m[key]
-				for n := ns.Front(); n != nil; n = n.Next() {
-					n.Value.(Subscriber).OnMessage(job)
-				}
 			}
 		}
-
-		s.mutexes[t].Unlock()
 	}
 }
 
@@ -227,7 +241,7 @@ func (s *NotifyService) Start() {
 	util.Logger().Debugf("notify service is started with config %s", s.Config)
 
 	for i := NotifyType(0); i != typeEnd; i++ {
-		go s.publish2Subscriber(i)
+		s.goroutine.Do(s.getPublish2SubscriberFunc(i))
 	}
 }
 
@@ -255,6 +269,8 @@ func (s *NotifyService) Stop() {
 
 	close(s.err)
 
+	s.goroutine.Close(true)
+
 	util.Logger().Debug("notify service stopped.")
 }
 
diff --git a/server/service/notification/watch_util.go b/server/service/notification/watch_util.go
index c938719..9a7e26c 100644
--- a/server/service/notification/watch_util.go
+++ b/server/service/notification/watch_util.go
@@ -62,6 +62,7 @@ type WebSocketHandler struct {
 	watcher         *ListWatcher
 	needPingWatcher bool
 	closed          chan struct{}
+	goroutine       *util.GoRoutine
 }
 
 func (wh *WebSocketHandler) Init() error {
@@ -101,7 +102,7 @@ func (wh *WebSocketHandler) websocketHeartbeat(messageType int) error {
 	return nil
 }
 
-func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage() {
+func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage(ctx context.Context) {
 	defer close(wh.closed)
 
 	remoteAddr := wh.conn.RemoteAddr().String()
@@ -128,17 +129,23 @@ func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage() {
 	})
 
 	for {
-		_, _, err := wh.conn.ReadMessage()
-		if err != nil {
-			wh.watcher.SetError(err)
+		select {
+		case <-ctx.Done():
 			return
+		default:
+			_, _, err := wh.conn.ReadMessage()
+			if err != nil {
+				wh.watcher.SetError(err)
+				return
+			}
 		}
 	}
 }
 
 func (wh *WebSocketHandler) HandleWatchWebSocketJob() {
-	remoteAddr := wh.conn.RemoteAddr().String()
+	wh.goroutine.Do(wh.HandleWatchWebSocketControlMessage)
 
+	remoteAddr := wh.conn.RemoteAddr().String()
 	for {
 		select {
 		case <-wh.closed:
@@ -224,8 +231,10 @@ func (wh *WebSocketHandler) HandleWatchWebSocketJob() {
 }
 
 func (wh *WebSocketHandler) Close(code int, text string) error {
+	defer wh.goroutine.Close(true)
+
 	remoteAddr := wh.conn.RemoteAddr().String()
-	message := []byte{}
+	var message []byte
 	if code != websocket.CloseNoStatusReceived {
 		message = websocket.FormatCloseMessage(code, text)
 	}
@@ -238,18 +247,6 @@ func (wh *WebSocketHandler) Close(code int, text string) error {
 	return nil
 }
 
-func DoWebSocketWatch(ctx context.Context, serviceId string, conn *websocket.Conn) {
-	domainProject := util.ParseDomainProject(ctx)
-	handler := &WebSocketHandler{
-		ctx:             ctx,
-		conn:            conn,
-		watcher:         NewInstanceWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/"),
-		needPingWatcher: true,
-		closed:          make(chan struct{}),
-	}
-	processHandler(handler)
-}
-
 func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
 	domainProject := util.ParseDomainProject(ctx)
 	handler := &WebSocketHandler{
@@ -258,6 +255,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([]
 		watcher:         NewInstanceListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f),
 		needPingWatcher: true,
 		closed:          make(chan struct{}),
+		goroutine:       util.NewGo(context.Background()),
 	}
 	processHandler(handler)
 }
@@ -266,7 +264,6 @@ func processHandler(handler *WebSocketHandler) {
 	if err := handler.Init(); err != nil {
 		return
 	}
-	go handler.HandleWatchWebSocketControlMessage()
 	handler.HandleWatchWebSocketJob()
 }
 
@@ -294,10 +291,6 @@ func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey
 	}
 }
 
-func NewInstanceWatcher(selfServiceId, instanceRoot string) *ListWatcher {
-	return NewWatcher(INSTANCE, selfServiceId, instanceRoot)
-}
-
 func NewInstanceListWatcher(selfServiceId, instanceRoot string, listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
 	return NewListWatcher(INSTANCE, selfServiceId, instanceRoot, listFunc)
 }
diff --git a/server/service/util/dependency.go b/server/service/util/dependency.go
index 3c91256..fb5741b 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -557,11 +557,10 @@ type Dependency struct {
 
 func (dep *Dependency) RemoveConsumerOfProviderRule() {
 	dep.chanNum++
-	go dep.removeConsumerOfProviderRule()
+	util.Go(dep.removeConsumerOfProviderRule)
 }
 
-func (dep *Dependency) removeConsumerOfProviderRule() {
-	ctx := context.TODO()
+func (dep *Dependency) removeConsumerOfProviderRule(ctx context.Context) {
 	opts := make([]registry.PluginOp, 0, len(dep.removedDependencyRuleList))
 	for _, providerRule := range dep.removedDependencyRuleList {
 		proProkey := apt.GenerateProviderDependencyRuleKey(providerRule.Tenant, providerRule)
@@ -605,11 +604,10 @@ func (dep *Dependency) removeConsumerOfProviderRule() {
 
 func (dep *Dependency) AddConsumerOfProviderRule() {
 	dep.chanNum++
-	go dep.addConsumerOfProviderRule()
+	util.Go(dep.addConsumerOfProviderRule)
 }
 
-func (dep *Dependency) addConsumerOfProviderRule() {
-	ctx := context.TODO()
+func (dep *Dependency) addConsumerOfProviderRule(ctx context.Context) {
 	opts := []registry.PluginOp{}
 	for _, providerRule := range dep.NewDependencyRuleList {
 		proProkey := apt.GenerateProviderDependencyRuleKey(providerRule.Tenant, providerRule)

-- 
To stop receiving notification emails like this one, please contact
littlecui@apache.org.