You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2022/01/21 07:37:26 UTC

[servicecomb-service-center] branch master updated: Bug: remove syncer.enabled (#1240)

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 57e24f7  Bug: remove syncer.enabled (#1240)
57e24f7 is described below

commit 57e24f71ba874c03462a79c41eb74bccb82a3e94
Author: little-cui <su...@qq.com>
AuthorDate: Fri Jan 21 15:37:19 2022 +0800

    Bug: remove syncer.enabled (#1240)
---
 datasource/etcd/event/instance_event_handler.go    |  33 ---
 datasource/mongo/event/instance_event_handler.go   |  43 ----
 .../mongo/event/instance_event_handler_test.go     |  16 --
 etc/conf/app.yaml                                  |   3 -
 eventbase/bootstrap/bootstrap.go                   |   1 +
 eventbase/datasource/mongo/task/task_dao.go        |   8 +-
 .../datasource/mongo/tombstone/tombstone_dao.go    |   8 +-
 server/bootstrap/bootstrap.go                      |   3 -
 server/rest/syncer/service/service.go              |  50 -----
 server/rest/syncer/service/service_test.go         |  81 -------
 server/rest/syncer/syncer.go                       |  34 ---
 server/rest/syncer/syncer_controller.go            |  41 ----
 server/server.go                                   |  19 +-
 server/syncernotify/common.go                      |  26 ---
 server/syncernotify/notify_service.go              |  83 -------
 server/syncernotify/syncer_publisher.go            | 107 ---------
 server/syncernotify/websocket.go                   | 246 ---------------------
 server/syncernotify/websocket_test.go              | 152 -------------
 18 files changed, 12 insertions(+), 942 deletions(-)

diff --git a/datasource/etcd/event/instance_event_handler.go b/datasource/etcd/event/instance_event_handler.go
index c3b4a4c..a83db1c 100644
--- a/datasource/etcd/event/instance_event_handler.go
+++ b/datasource/etcd/event/instance_event_handler.go
@@ -27,20 +27,13 @@ import (
 	"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
 	serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
-	"github.com/apache/servicecomb-service-center/pkg/dump"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/event"
 	quotasvc "github.com/apache/servicecomb-service-center/server/service/quota"
-	"github.com/apache/servicecomb-service-center/server/syncernotify"
 	pb "github.com/go-chassis/cari/discovery"
 )
 
-const (
-	msKeyPrefix = "/cse-sr/ms/files/"
-	sep         = "/"
-)
-
 // InstanceEventHandler is the handler to handle:
 // 1. report instance metrics
 // 2. recover the instance quota
@@ -80,10 +73,6 @@ func (h *InstanceEventHandler) OnEvent(evt kvstore.Event) {
 		return
 	}
 
-	if !syncernotify.GetSyncerNotifyCenter().Closed() {
-		NotifySyncerInstanceEvent(evt, domainProject, ms)
-	}
-
 	if event.Center().Closed() {
 		log.Warn(fmt.Sprintf("caught [%s] instance[%s/%s] event, endpoints %v, but notify service is closed",
 			action, providerID, providerInstanceID, instance.Endpoints))
@@ -129,25 +118,3 @@ func PublishInstanceEvent(evt kvstore.Event, serviceKey *pb.MicroServiceKey, sub
 		}
 	}
 }
-
-func NotifySyncerInstanceEvent(evt kvstore.Event, domainProject string, ms *pb.MicroService) {
-	msInstance := evt.KV.Value.(*pb.MicroServiceInstance)
-
-	serviceKey := msKeyPrefix + domainProject + sep + ms.ServiceId
-	msKV := &dump.KV{Key: serviceKey, ClusterName: evt.KV.ClusterName}
-	service := &dump.Microservice{KV: msKV, Value: ms}
-
-	instKey := string(evt.KV.Key)
-	instKV := &dump.KV{Key: instKey, ClusterName: evt.KV.ClusterName}
-	instance := &dump.Instance{KV: instKV, Value: msInstance}
-
-	instEvent := &dump.WatchInstanceChangedEvent{
-		Action:   string(evt.Type),
-		Service:  service,
-		Instance: instance,
-	}
-
-	syncernotify.GetSyncerNotifyCenter().AddEvent(instEvent)
-
-	log.Debug(fmt.Sprintf("success to add instance change event: %v to event queue", instEvent))
-}
diff --git a/datasource/mongo/event/instance_event_handler.go b/datasource/mongo/event/instance_event_handler.go
index a14fd1b..40ce510 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -28,12 +28,10 @@ import (
 	"github.com/apache/servicecomb-service-center/datasource/mongo"
 	"github.com/apache/servicecomb-service-center/datasource/mongo/model"
 	"github.com/apache/servicecomb-service-center/datasource/mongo/sd"
-	"github.com/apache/servicecomb-service-center/pkg/dump"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	simple "github.com/apache/servicecomb-service-center/pkg/time"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/event"
-	"github.com/apache/servicecomb-service-center/server/syncernotify"
 )
 
 // InstanceEventHandler is the handler to handle events
@@ -69,9 +67,6 @@ func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
 	if action == discovery.EVT_INIT {
 		return
 	}
-	if !syncernotify.GetSyncerNotifyCenter().Closed() {
-		NotifySyncerInstanceEvent(evt, microService)
-	}
 	consumerIDs, err := mongo.GetConsumerIDs(ctx, microService)
 	if err != nil {
 		log.Error(fmt.Sprintf("get service[%s][%s/%s/%s/%s]'s consumerIDs failed",
@@ -102,41 +97,3 @@ func PublishInstanceEvent(evt sd.MongoEvent, serviceKey *discovery.MicroServiceK
 		}
 	}
 }
-
-func NotifySyncerInstanceEvent(event sd.MongoEvent, microService *discovery.MicroService) {
-	instance := event.Value.(model.Instance).Instance
-	log.Info(fmt.Sprintf("instanceId : %s and serviceId : %s in NotifySyncerInstanceEvent", instance.InstanceId, instance.ServiceId))
-	instanceKey := util.StringJoin([]string{datasource.InstanceKeyPrefix, event.Value.(model.Instance).Domain,
-		event.Value.(model.Instance).Project, instance.ServiceId, instance.InstanceId}, datasource.SPLIT)
-
-	instanceKv := dump.KV{
-		Key:   instanceKey,
-		Value: instance,
-	}
-
-	dumpInstance := dump.Instance{
-		KV:    &instanceKv,
-		Value: instance,
-	}
-	serviceKey := util.StringJoin([]string{datasource.ServiceKeyPrefix, event.Value.(model.Instance).Domain,
-		event.Value.(model.Instance).Project, instance.ServiceId}, datasource.SPLIT)
-	serviceKv := dump.KV{
-		Key:   serviceKey,
-		Value: microService,
-	}
-
-	dumpService := dump.Microservice{
-		KV:    &serviceKv,
-		Value: microService,
-	}
-
-	instEvent := &dump.WatchInstanceChangedEvent{
-		Action:   string(event.Type),
-		Service:  &dumpService,
-		Instance: &dumpInstance,
-	}
-	syncernotify.GetSyncerNotifyCenter().AddEvent(instEvent)
-
-	log.Debug(fmt.Sprintf("success to add instance change event action [%s], instanceKey : %s to event queue",
-		instEvent.Action, instanceKey))
-}
diff --git a/datasource/mongo/event/instance_event_handler_test.go b/datasource/mongo/event/instance_event_handler_test.go
index 076f57f..8e53077 100644
--- a/datasource/mongo/event/instance_event_handler_test.go
+++ b/datasource/mongo/event/instance_event_handler_test.go
@@ -28,7 +28,6 @@ import (
 	"github.com/apache/servicecomb-service-center/datasource/mongo/event"
 	"github.com/apache/servicecomb-service-center/datasource/mongo/model"
 	"github.com/apache/servicecomb-service-center/datasource/mongo/sd"
-	"github.com/apache/servicecomb-service-center/server/syncernotify"
 	_ "github.com/apache/servicecomb-service-center/test"
 )
 
@@ -49,21 +48,6 @@ func TestInstanceEventHandler_OnEvent(t *testing.T) {
 		h.OnEvent(mongoAssign())
 		assert.Error(t, assert.AnError)
 	})
-	t.Run("OnEvent test when syncer notify center open", func(t *testing.T) {
-		syncernotify.GetSyncerNotifyCenter().Start()
-		h := event.InstanceEventHandler{}
-		h.OnEvent(mongoAssign())
-		assert.Equal(t, false, t.Failed(), "add event succeed")
-	})
-}
-
-func TestNotifySyncerInstanceEvent(t *testing.T) {
-	t.Run("test when data is ok", func(t *testing.T) {
-		mongoEvent := mongoAssign()
-		microService := getMicroService()
-		event.NotifySyncerInstanceEvent(mongoEvent, microService)
-		assert.Equal(t, false, t.Failed())
-	})
 }
 
 func mongoAssign() sd.MongoEvent {
diff --git a/etc/conf/app.yaml b/etc/conf/app.yaml
index d0700ae..ed039f4 100644
--- a/etc/conf/app.yaml
+++ b/etc/conf/app.yaml
@@ -220,9 +220,6 @@ auth:
 auditlog:
   kind:
 
-syncer:
-  enabled: false
-
 heartbeat:
   # configuration of websocket long connection
   websocket:
diff --git a/eventbase/bootstrap/bootstrap.go b/eventbase/bootstrap/bootstrap.go
index 977089f..c304b12 100644
--- a/eventbase/bootstrap/bootstrap.go
+++ b/eventbase/bootstrap/bootstrap.go
@@ -18,6 +18,7 @@
 package bootstrap
 
 import (
+	// datasource implements
 	_ "github.com/apache/servicecomb-service-center/eventbase/datasource/etcd"
 	_ "github.com/apache/servicecomb-service-center/eventbase/datasource/mongo"
 )
diff --git a/eventbase/datasource/mongo/task/task_dao.go b/eventbase/datasource/mongo/task/task_dao.go
index 8d6c180..bf3798e 100644
--- a/eventbase/datasource/mongo/task/task_dao.go
+++ b/eventbase/datasource/mongo/task/task_dao.go
@@ -65,10 +65,10 @@ func (d *Dao) Delete(ctx context.Context, tasks ...*sync.Task) error {
 	for i, task := range tasks {
 		tasksIDs[i] = task.ID
 		dFilter := bson.D{
-			{model.ColumnDomain, task.Domain},
-			{model.ColumnProject, task.Project},
-			{model.ColumnID, task.ID},
-			{model.ColumnTimestamp, task.Timestamp},
+			{Key: model.ColumnDomain, Value: task.Domain},
+			{Key: model.ColumnProject, Value: task.Project},
+			{Key: model.ColumnID, Value: task.ID},
+			{Key: model.ColumnTimestamp, Value: task.Timestamp},
 		}
 		filter = append(filter, dFilter)
 	}
diff --git a/eventbase/datasource/mongo/tombstone/tombstone_dao.go b/eventbase/datasource/mongo/tombstone/tombstone_dao.go
index 85621fc..891571f 100644
--- a/eventbase/datasource/mongo/tombstone/tombstone_dao.go
+++ b/eventbase/datasource/mongo/tombstone/tombstone_dao.go
@@ -68,10 +68,10 @@ func (d *Dao) Delete(ctx context.Context, tombstones ...*sync.Tombstone) error {
 	for i, tombstone := range tombstones {
 		tombstonesIDs[i] = tombstone.ResourceID
 		dFilter := bson.D{
-			{model.ColumnResourceID, tombstone.ResourceID},
-			{model.ColumnResourceType, tombstone.ResourceType},
-			{model.ColumnDomain, tombstone.Domain},
-			{model.ColumnProject, tombstone.Project},
+			{Key: model.ColumnResourceID, Value: tombstone.ResourceID},
+			{Key: model.ColumnResourceType, Value: tombstone.ResourceType},
+			{Key: model.ColumnDomain, Value: tombstone.Domain},
+			{Key: model.ColumnProject, Value: tombstone.Project},
 		}
 		filter = append(filter, dFilter)
 	}
diff --git a/server/bootstrap/bootstrap.go b/server/bootstrap/bootstrap.go
index 94153ab..46cb5b9 100644
--- a/server/bootstrap/bootstrap.go
+++ b/server/bootstrap/bootstrap.go
@@ -59,9 +59,6 @@ import (
 	//module 'admin'
 	_ "github.com/apache/servicecomb-service-center/server/rest/admin"
 
-	//module 'syncer'
-	_ "github.com/apache/servicecomb-service-center/server/rest/syncer"
-
 	//governance
 	_ "github.com/apache/servicecomb-service-center/server/service/gov/kie"
 
diff --git a/server/rest/syncer/service/service.go b/server/rest/syncer/service/service.go
deleted file mode 100644
index bf1c444..0000000
--- a/server/rest/syncer/service/service.go
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package service
-
-import (
-	"context"
-	"net/http"
-
-	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/server/syncernotify"
-	"github.com/gorilla/websocket"
-)
-
-var (
-	ServiceAPI = &Service{}
-)
-
-type Service struct {
-}
-
-func (service *Service) WatchInstance(w http.ResponseWriter, r *http.Request) {
-	var upgrader = websocket.Upgrader{
-		CheckOrigin: func(r *http.Request) bool {
-			return true
-		},
-	}
-	conn, err := upgrader.Upgrade(w, r, nil)
-	if err != nil {
-		log.Error("upgrade failed", err)
-		return
-	}
-	defer conn.Close()
-
-	syncernotify.DoWebSocketWatch(context.Background(), conn)
-}
diff --git a/server/rest/syncer/service/service_test.go b/server/rest/syncer/service/service_test.go
deleted file mode 100644
index ed31141..0000000
--- a/server/rest/syncer/service/service_test.go
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package service_test
-
-// initialize
-import (
-	"context"
-	"io"
-	"net/http"
-	"net/http/httptest"
-	"os"
-	"path/filepath"
-	"testing"
-
-	"github.com/apache/servicecomb-service-center/pkg/util"
-	"github.com/apache/servicecomb-service-center/server/config"
-	"github.com/apache/servicecomb-service-center/server/rest/syncer"
-	"github.com/apache/servicecomb-service-center/server/rest/syncer/service"
-	_ "github.com/apache/servicecomb-service-center/test"
-	"github.com/go-chassis/go-archaius"
-	"github.com/stretchr/testify/assert"
-)
-
-type mockSyncerHandler struct {
-	Func func(w http.ResponseWriter, r *http.Request)
-}
-
-func (m *mockSyncerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	m.Func(w, r)
-}
-
-func TestSyncerService_WatchInstance(t *testing.T) {
-	//create fake config file for test
-	defer archaius.Clean()
-	b := []byte(`
-syncer:
-enabled: true
-`)
-	dir := filepath.Join(util.GetAppRoot(), "conf")
-	defer os.Remove(dir)
-	os.Mkdir(dir, 0750)
-	file := filepath.Join(dir, "app.yaml")
-	defer os.Remove(file)
-	f1, err := os.Create(file)
-	assert.NoError(t, err)
-	_, err = io.WriteString(f1, string(b))
-	assert.NoError(t, err)
-	config.Init()
-	assert.NoError(t, err)
-
-	svr := httptest.NewServer(&mockSyncerHandler{func(w http.ResponseWriter, r *http.Request) {
-		ctrl := &syncer.Controller{}
-		ctrl.WatchInstance(w, r.WithContext(getContext()))
-	}})
-	defer svr.Close()
-
-	// error situation test
-	w := httptest.NewRecorder()
-	r := &http.Request{}
-	service.ServiceAPI.WatchInstance(w, r.WithContext(getContext()))
-}
-
-func getContext() context.Context {
-	return util.SetContext(
-		util.SetDomainProject(context.Background(), "default", "default"),
-		util.CtxNocache, "1")
-}
diff --git a/server/rest/syncer/syncer.go b/server/rest/syncer/syncer.go
deleted file mode 100644
index 6812168..0000000
--- a/server/rest/syncer/syncer.go
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package syncer
-
-import (
-	roa "github.com/apache/servicecomb-service-center/pkg/rest"
-	"github.com/apache/servicecomb-service-center/server/config"
-)
-
-func init() {
-	registerREST()
-}
-
-func registerREST() {
-	syncerEnabled := config.GetBool("syncer.enabled", false)
-	if syncerEnabled {
-		roa.RegisterServant(&Controller{})
-	}
-}
diff --git a/server/rest/syncer/syncer_controller.go b/server/rest/syncer/syncer_controller.go
deleted file mode 100644
index 6d6ba6f..0000000
--- a/server/rest/syncer/syncer_controller.go
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package syncer
-
-import (
-	"net/http"
-
-	"github.com/apache/servicecomb-service-center/server/rest/syncer/service"
-
-	"github.com/apache/servicecomb-service-center/pkg/rest"
-)
-
-// Syncer 有关的接口
-type Controller struct {
-}
-
-// URLPatterns 路由
-func (ctrl *Controller) URLPatterns() []rest.Route {
-	return []rest.Route{
-		{Method: http.MethodGet, Path: "/v4/syncer/watch", Func: ctrl.WatchInstance},
-	}
-}
-
-func (ctrl *Controller) WatchInstance(w http.ResponseWriter, r *http.Request) {
-	service.ServiceAPI.WatchInstance(w, r)
-}
diff --git a/server/server.go b/server/server.go
index a26703f..fe7672c 100644
--- a/server/server.go
+++ b/server/server.go
@@ -34,7 +34,6 @@ import (
 	"github.com/apache/servicecomb-service-center/server/plugin/security/tlsconf"
 	"github.com/apache/servicecomb-service-center/server/service/gov"
 	"github.com/apache/servicecomb-service-center/server/service/rbac"
-	snf "github.com/apache/servicecomb-service-center/server/syncernotify"
 	"github.com/go-chassis/foundation/gopool"
 	"github.com/little-cui/etcdadpt"
 )
@@ -54,10 +53,9 @@ type endpoint struct {
 }
 
 type ServiceCenterServer struct {
-	Endpoint            endpoint
-	APIServer           *APIServer
-	eventCenter         *nf.BusService
-	syncerNotifyService *snf.Service
+	Endpoint    endpoint
+	APIServer   *APIServer
+	eventCenter *nf.BusService
 }
 
 func (s *ServiceCenterServer) Run() {
@@ -87,7 +85,6 @@ func (s *ServiceCenterServer) initialize() {
 	s.initDatasource()
 	s.APIServer = GetAPIServer()
 	s.eventCenter = event.Center()
-	s.syncerNotifyService = snf.GetSyncerNotifyCenter()
 }
 
 func (s *ServiceCenterServer) initEndpoints() {
@@ -167,12 +164,6 @@ func (s *ServiceCenterServer) startServices() {
 	// notifications
 	s.eventCenter.Start()
 
-	// notify syncer
-	syncerEnabled := config.GetBool("syncer.enabled", false)
-	if syncerEnabled {
-		s.syncerNotifyService.Start()
-	}
-
 	// load server plugins
 	plugin.LoadPlugins()
 	rbac.Init()
@@ -203,10 +194,6 @@ func (s *ServiceCenterServer) Stop() {
 		s.eventCenter.Stop()
 	}
 
-	if s.syncerNotifyService != nil {
-		s.syncerNotifyService.Stop()
-	}
-
 	gopool.CloseAndWait()
 
 	log.Warn("service center stopped")
diff --git a/server/syncernotify/common.go b/server/syncernotify/common.go
deleted file mode 100644
index b92c89a..0000000
--- a/server/syncernotify/common.go
+++ /dev/null
@@ -1,26 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package syncernotify
-
-import "time"
-
-const (
-	HeartbeatInterval      = 30 * time.Second
-	ReadTimeout            = HeartbeatInterval * 4
-	SendTimeout            = 5 * time.Second
-	InstanceEventQueueSize = 5000
-	ReadMaxBody            = 64
-)
diff --git a/server/syncernotify/notify_service.go b/server/syncernotify/notify_service.go
deleted file mode 100644
index 8cbba01..0000000
--- a/server/syncernotify/notify_service.go
+++ /dev/null
@@ -1,83 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package syncernotify
-
-import (
-	"fmt"
-	"sync"
-
-	pb "github.com/apache/servicecomb-service-center/pkg/dump"
-	"github.com/apache/servicecomb-service-center/pkg/log"
-)
-
-var syncerNotifyService *Service
-
-func init() {
-	syncerNotifyService = NewSyncerNotifyService()
-}
-
-func GetSyncerNotifyCenter() *Service {
-	return syncerNotifyService
-}
-
-type Service struct {
-	instEventCh chan *pb.WatchInstanceChangedEvent
-	mux         sync.RWMutex
-	isClose     bool
-}
-
-func NewSyncerNotifyService() *Service {
-	return &Service{
-		instEventCh: make(chan *pb.WatchInstanceChangedEvent, InstanceEventQueueSize),
-		isClose:     true,
-	}
-}
-
-func (s *Service) AddEvent(event *pb.WatchInstanceChangedEvent) {
-	s.instEventCh <- event
-	log.Debug(fmt.Sprintf("add instance event to instance event channel, instEventCh len is: %d", len(s.instEventCh)))
-}
-
-func (s *Service) Start() {
-	if !s.Closed() {
-		log.Warn("syncer notify service is already running")
-		return
-	}
-
-	s.mux.Lock()
-	s.isClose = false
-	s.mux.Unlock()
-
-	log.Debug("syncer notify service is started")
-}
-
-func (s *Service) Closed() (b bool) {
-	s.mux.RLock()
-	b = s.isClose
-	s.mux.RUnlock()
-	return b
-}
-
-func (s *Service) Stop() {
-	if s.Closed() {
-		return
-	}
-	s.mux.Lock()
-	s.isClose = true
-	s.mux.Unlock()
-
-	log.Debug("syncer notify service stopped")
-}
diff --git a/server/syncernotify/syncer_publisher.go b/server/syncernotify/syncer_publisher.go
deleted file mode 100644
index 6e14c1f..0000000
--- a/server/syncernotify/syncer_publisher.go
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package syncernotify
-
-import (
-	"context"
-	"fmt"
-	"time"
-
-	"github.com/apache/servicecomb-service-center/pkg/goutil"
-	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/server/alarm"
-	"github.com/go-chassis/foundation/gopool"
-)
-
-var publisher *Publisher
-
-func init() {
-	publisher = NewPublisher()
-	publisher.Run()
-}
-
-type Publisher struct {
-	ws        *WebSocket
-	goroutine *gopool.Pool
-}
-
-func (wh *Publisher) Run() {
-	gopool.Go(publisher.loop)
-}
-
-func (wh *Publisher) Stop() {
-	wh.goroutine.Close(true)
-}
-
-func (wh *Publisher) dispatch(ws *WebSocket, payload interface{}) {
-	wh.goroutine.Do(func(ctx context.Context) {
-		ws.HandleWatchWebSocketJob(payload)
-	})
-}
-
-func (wh *Publisher) loop(ctx context.Context) {
-	defer wh.Stop()
-	ticker := time.NewTicker(500 * time.Millisecond)
-	for {
-		select {
-		case <-ctx.Done():
-			// server shutdown
-			return
-		case <-ticker.C:
-			wsIsActive := true
-			if wh.ws != nil {
-				if payload := wh.ws.Pick(); payload != nil {
-					if _, ok := payload.(error); ok {
-						wsIsActive = false
-					}
-					wh.dispatch(wh.ws, payload)
-				}
-			}
-			if !wsIsActive {
-				log.Debug(fmt.Sprintf("release websocket conn :%s", wh.ws.conn.RemoteAddr()))
-
-				err := wh.ws.conn.Close()
-				if err != nil {
-					log.Error("conn close failed", err)
-				}
-
-				wh.ws = nil
-
-				err = alarm.Raise(alarm.IDWebsocketOfScSyncerLost, alarm.AdditionalContext("%v", err))
-				if err != nil {
-					log.Error("alarm error", err)
-				}
-			}
-		}
-	}
-}
-
-func (wh *Publisher) Accept(ws *WebSocket) {
-	log.Debug(fmt.Sprintf("get a new websocket:%s", ws.conn.RemoteAddr()))
-	wh.ws = ws
-}
-
-func NewPublisher() *Publisher {
-	return &Publisher{
-		goroutine: goutil.New(),
-	}
-}
-
-func Instance() *Publisher {
-	return publisher
-}
diff --git a/server/syncernotify/websocket.go b/server/syncernotify/websocket.go
deleted file mode 100644
index 9522f3d..0000000
--- a/server/syncernotify/websocket.go
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package syncernotify
-
-import (
-	"context"
-	"encoding/json"
-	"fmt"
-	"time"
-
-	pb "github.com/apache/servicecomb-service-center/pkg/dump"
-	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/pkg/util"
-	"github.com/gorilla/websocket"
-)
-
-type WebSocket struct {
-	ctx    context.Context
-	conn   *websocket.Conn
-	err    error
-	free   chan struct{}
-	closed chan struct{}
-}
-
-func DoWebSocketWatch(ctx context.Context, conn *websocket.Conn) {
-	log.Debug("begin do websocket watch")
-
-	socket := NewWebSocket(ctx, conn)
-
-	process(socket)
-}
-
-func NewWebSocket(ctx context.Context, conn *websocket.Conn) *WebSocket {
-	return &WebSocket{
-		ctx:  ctx,
-		conn: conn,
-	}
-}
-
-func process(socket *WebSocket) {
-	socket.Init()
-
-	socket.HandleWatchWebSocketControlMessage()
-
-	socket.Stop()
-}
-
-func (wh *WebSocket) Init() {
-	wh.free = make(chan struct{}, 1)
-	wh.closed = make(chan struct{})
-	wh.SetReady()
-	remoteAddr := wh.conn.RemoteAddr().String()
-
-	Instance().Accept(wh)
-	log.Debug(fmt.Sprintf("start watching instance status, watcher[%s]", remoteAddr))
-}
-
-func (wh *WebSocket) Pick() interface{} {
-	select {
-	case <-wh.Ready():
-		if wh.Err() != nil {
-			return wh.Err()
-		}
-
-		select {
-		case e := <-GetSyncerNotifyCenter().instEventCh:
-			return e
-		default:
-			// reset if idle
-			wh.SetReady()
-		}
-	default:
-	}
-	return nil
-}
-
-func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) {
-	defer wh.SetReady()
-
-	var (
-		message    []byte
-		remoteAddr = wh.conn.RemoteAddr().String()
-	)
-
-	switch o := o.(type) {
-	// error will be set in HandleWatchWebSocketControlMessage
-	case error:
-		log.Error(fmt.Sprintf("watcher[%s] catch an err,", remoteAddr), o)
-
-		message = util.StringToBytesWithNoCopy(fmt.Sprintf("watcher catch an err: %s", o.Error()))
-	// InstanceChangedEvent will be set in OnEvent
-	case *pb.WatchInstanceChangedEvent:
-		resp := o
-
-		log.Info(fmt.Sprintf("event[%s] is coming in, watcher[%s]", resp.Action, remoteAddr))
-
-		data, err := json.Marshal(resp)
-		if err != nil {
-			log.Error(fmt.Sprintf("watcher[%s] marshal response failed", remoteAddr), err)
-			message = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", err.Error()))
-			break
-		}
-		message = data
-	default:
-		log.Error(fmt.Sprintf("watcher[%s] unknown input %v", remoteAddr, o), nil)
-		return
-	}
-
-	select {
-	case <-wh.closed:
-		return
-	default:
-	}
-
-	err := wh.WriteMessage(message)
-
-	if err != nil {
-		log.Error(fmt.Sprintf("watcher[%s] catch an err", remoteAddr), err)
-	}
-}
-
-func (wh *WebSocket) ReadTimeout() time.Duration {
-	return ReadTimeout
-}
-
-func (wh *WebSocket) SendTimeout() time.Duration {
-	return SendTimeout
-}
-
-func (wh *WebSocket) Heartbeat(messageType int) error {
-	err := wh.conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout()))
-	if err != nil {
-		messageTypeName := "Ping"
-		if messageType == websocket.PongMessage {
-			messageTypeName = "Pong"
-		}
-		log.Error(fmt.Sprintf("fail to send '%s' to watcher[%s]", messageTypeName, wh.conn.RemoteAddr()), err)
-		return err
-	}
-	return nil
-}
-
-func (wh *WebSocket) HandleWatchWebSocketControlMessage() {
-	remoteAddr := wh.conn.RemoteAddr().String()
-	// PING
-	wh.conn.SetPingHandler(func(message string) error {
-		defer func() {
-			err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
-			if err != nil {
-				log.Error("", err)
-			}
-		}()
-		log.Debug(fmt.Sprintf("received 'Ping' message '%s' from watcher[%s]", message, remoteAddr))
-		return wh.Heartbeat(websocket.PongMessage)
-	})
-	// PONG
-	wh.conn.SetPongHandler(func(message string) error {
-		defer func() {
-			err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
-			if err != nil {
-				log.Error("", err)
-			}
-		}()
-		log.Debug(fmt.Sprintf("received 'Pong' message '%s' from watcher[%s]", message, remoteAddr))
-		return nil
-	})
-	// CLOSE
-	wh.conn.SetCloseHandler(func(code int, text string) error {
-		log.Info(fmt.Sprintf("watcher[%s] active closed, code: %d, message: '%s'", remoteAddr, code, text))
-		return wh.sendClose(code, text)
-	})
-
-	wh.conn.SetReadLimit(ReadMaxBody)
-	err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
-	if err != nil {
-		log.Error("", err)
-	}
-	for {
-		_, _, err := wh.conn.ReadMessage()
-		if err != nil {
-			// client close or conn broken
-			wh.SetError(err)
-			return
-		}
-	}
-}
-
-func (wh *WebSocket) sendClose(code int, text string) error {
-	remoteAddr := wh.conn.RemoteAddr().String()
-	var message []byte
-	if code != websocket.CloseNoStatusReceived {
-		message = websocket.FormatCloseMessage(code, text)
-	}
-	err := wh.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(wh.SendTimeout()))
-	if err != nil {
-		log.Error(fmt.Sprintf("watcher[%s] catch an err", remoteAddr), err)
-		return err
-	}
-	return nil
-}
-
-func (wh *WebSocket) WriteMessage(message []byte) error {
-	err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout()))
-	if err != nil {
-		return err
-	}
-	return wh.conn.WriteMessage(websocket.TextMessage, message)
-}
-
-func (wh *WebSocket) Ready() <-chan struct{} {
-	return wh.free
-}
-
-func (wh *WebSocket) SetReady() {
-	select {
-	case wh.free <- struct{}{}:
-	default:
-	}
-}
-
-func (wh *WebSocket) Stop() {
-	close(wh.closed)
-}
-
-func (wh *WebSocket) SetError(e error) {
-	wh.err = e
-}
-
-func (wh *WebSocket) Err() error {
-	return wh.err
-}
diff --git a/server/syncernotify/websocket_test.go b/server/syncernotify/websocket_test.go
deleted file mode 100644
index 9dd77d6..0000000
--- a/server/syncernotify/websocket_test.go
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package syncernotify_test
-
-// initialize
-import (
-	"context"
-	"errors"
-	"net/http"
-	"net/http/httptest"
-	"strings"
-	"testing"
-	"time"
-
-	"github.com/apache/servicecomb-service-center/pkg/dump"
-	"github.com/apache/servicecomb-service-center/server/core"
-	. "github.com/apache/servicecomb-service-center/server/syncernotify"
-	"github.com/go-chassis/cari/discovery"
-	"github.com/gorilla/websocket"
-	"github.com/stretchr/testify/assert"
-)
-
-var closeCh = make(chan struct{})
-
-type watcherConn struct {
-}
-
-func init() {
-	testing.Init()
-	core.Initialize()
-}
-
-func (h *watcherConn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	var upgrader = websocket.Upgrader{}
-	conn, _ := upgrader.Upgrade(w, r, nil)
-	for {
-		conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
-		conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
-		_, _, err := conn.ReadMessage()
-		if err != nil {
-			return
-		}
-		<-closeCh
-		conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
-		conn.Close()
-		return
-	}
-}
-
-func TestDoWebSocketWatch(t *testing.T) {
-	s := httptest.NewServer(&watcherConn{})
-
-	conn, _, _ := websocket.DefaultDialer.Dial(
-		strings.Replace(s.URL, "http://", "ws://", 1), nil)
-	//fmt.Print(conn)
-	ws := NewWebSocket(context.Background(), conn)
-	ws.Init()
-
-	t.Run("start syncer center", func(t *testing.T) {
-
-		GetSyncerNotifyCenter().Start()
-
-		go DoWebSocketWatch(context.Background(), conn)
-
-		go ws.HandleWatchWebSocketControlMessage()
-
-	})
-
-	t.Run("handle websocket job", func(t *testing.T) {
-		// 1. add event to channel, handle event
-		instanceEvent := &dump.WatchInstanceChangedEvent{
-			Action: "CREATE",
-			Service: &dump.Microservice{
-				KV: &dump.KV{
-					Key:         "/cse-sr/ms/files/default/default/4042a6a3e5a2893698ae363ea99a69eb63fc51cd",
-					Rev:         12,
-					ClusterName: "clustername",
-				},
-				Value: &discovery.MicroService{
-					ServiceId:   "4042a6a3e5a2893698ae363ea99a69eb63fc51cd",
-					AppId:       "default",
-					ServiceName: "TEST01",
-					Version:     "0.0.1",
-					Level:       "BACK",
-					Schemas: []string{
-						"servicecenter.grpc.api.ServiceCtrl",
-						"servicecenter.grpc.api.ServiceInstanceCtrl",
-					},
-				},
-			},
-			Instance: &dump.Instance{
-				KV: &dump.KV{
-					Key:         "/cse-sr/inst/files/default/default/4042a6a3e5a2893698ae363ea99a69eb63fc51cd/7a6be9f861a811e9b3f6fa163eca30e0",
-					Rev:         21,
-					ClusterName: "clustername",
-				},
-				Value: &discovery.MicroServiceInstance{
-					InstanceId: "8e0fe4b961a811e981a6fa163e86b81a",
-					ServiceId:  "4042a6a3e5a2893698ae363ea99a69eb63fc51cd",
-					Endpoints: []string{
-						"rest://192.168.88.109:30100/",
-					},
-					HostName: "sunlisen",
-				},
-			},
-		}
-
-		GetSyncerNotifyCenter().AddEvent(instanceEvent)
-
-		<-time.After(time.Second)
-
-		// 2. handle unknown things
-		ws.HandleWatchWebSocketJob(nil)
-
-		// 3. handle error
-		fakeErr := errors.New("fake error")
-		ws.HandleWatchWebSocketJob(fakeErr)
-
-		// 4. heartbeat
-		err := ws.Heartbeat(websocket.PingMessage)
-		assert.NoError(t, err)
-		err = ws.Heartbeat(websocket.PongMessage)
-		assert.NoError(t, err)
-
-		closeCh <- struct{}{}
-
-		<-time.After(time.Second)
-
-		err = ws.Heartbeat(websocket.PingMessage)
-		assert.Error(t, err)
-		err = ws.Heartbeat(websocket.PongMessage)
-		assert.Error(t, err)
-
-	})
-
-	Instance().Stop()
-	GetSyncerNotifyCenter().Stop()
-}