You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2022/01/21 07:37:26 UTC
[servicecomb-service-center] branch master updated: Bug: remove syncer.enabled (#1240)
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 57e24f7 Bug: remove syncer.enabled (#1240)
57e24f7 is described below
commit 57e24f71ba874c03462a79c41eb74bccb82a3e94
Author: little-cui <su...@qq.com>
AuthorDate: Fri Jan 21 15:37:19 2022 +0800
Bug: remove syncer.enabled (#1240)
---
datasource/etcd/event/instance_event_handler.go | 33 ---
datasource/mongo/event/instance_event_handler.go | 43 ----
.../mongo/event/instance_event_handler_test.go | 16 --
etc/conf/app.yaml | 3 -
eventbase/bootstrap/bootstrap.go | 1 +
eventbase/datasource/mongo/task/task_dao.go | 8 +-
.../datasource/mongo/tombstone/tombstone_dao.go | 8 +-
server/bootstrap/bootstrap.go | 3 -
server/rest/syncer/service/service.go | 50 -----
server/rest/syncer/service/service_test.go | 81 -------
server/rest/syncer/syncer.go | 34 ---
server/rest/syncer/syncer_controller.go | 41 ----
server/server.go | 19 +-
server/syncernotify/common.go | 26 ---
server/syncernotify/notify_service.go | 83 -------
server/syncernotify/syncer_publisher.go | 107 ---------
server/syncernotify/websocket.go | 246 ---------------------
server/syncernotify/websocket_test.go | 152 -------------
18 files changed, 12 insertions(+), 942 deletions(-)
diff --git a/datasource/etcd/event/instance_event_handler.go b/datasource/etcd/event/instance_event_handler.go
index c3b4a4c..a83db1c 100644
--- a/datasource/etcd/event/instance_event_handler.go
+++ b/datasource/etcd/event/instance_event_handler.go
@@ -27,20 +27,13 @@ import (
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
- "github.com/apache/servicecomb-service-center/pkg/dump"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/event"
quotasvc "github.com/apache/servicecomb-service-center/server/service/quota"
- "github.com/apache/servicecomb-service-center/server/syncernotify"
pb "github.com/go-chassis/cari/discovery"
)
-const (
- msKeyPrefix = "/cse-sr/ms/files/"
- sep = "/"
-)
-
// InstanceEventHandler is the handler to handle:
// 1. report instance metrics
// 2. recover the instance quota
@@ -80,10 +73,6 @@ func (h *InstanceEventHandler) OnEvent(evt kvstore.Event) {
return
}
- if !syncernotify.GetSyncerNotifyCenter().Closed() {
- NotifySyncerInstanceEvent(evt, domainProject, ms)
- }
-
if event.Center().Closed() {
log.Warn(fmt.Sprintf("caught [%s] instance[%s/%s] event, endpoints %v, but notify service is closed",
action, providerID, providerInstanceID, instance.Endpoints))
@@ -129,25 +118,3 @@ func PublishInstanceEvent(evt kvstore.Event, serviceKey *pb.MicroServiceKey, sub
}
}
}
-
-func NotifySyncerInstanceEvent(evt kvstore.Event, domainProject string, ms *pb.MicroService) {
- msInstance := evt.KV.Value.(*pb.MicroServiceInstance)
-
- serviceKey := msKeyPrefix + domainProject + sep + ms.ServiceId
- msKV := &dump.KV{Key: serviceKey, ClusterName: evt.KV.ClusterName}
- service := &dump.Microservice{KV: msKV, Value: ms}
-
- instKey := string(evt.KV.Key)
- instKV := &dump.KV{Key: instKey, ClusterName: evt.KV.ClusterName}
- instance := &dump.Instance{KV: instKV, Value: msInstance}
-
- instEvent := &dump.WatchInstanceChangedEvent{
- Action: string(evt.Type),
- Service: service,
- Instance: instance,
- }
-
- syncernotify.GetSyncerNotifyCenter().AddEvent(instEvent)
-
- log.Debug(fmt.Sprintf("success to add instance change event: %v to event queue", instEvent))
-}
diff --git a/datasource/mongo/event/instance_event_handler.go b/datasource/mongo/event/instance_event_handler.go
index a14fd1b..40ce510 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -28,12 +28,10 @@ import (
"github.com/apache/servicecomb-service-center/datasource/mongo"
"github.com/apache/servicecomb-service-center/datasource/mongo/model"
"github.com/apache/servicecomb-service-center/datasource/mongo/sd"
- "github.com/apache/servicecomb-service-center/pkg/dump"
"github.com/apache/servicecomb-service-center/pkg/log"
simple "github.com/apache/servicecomb-service-center/pkg/time"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/event"
- "github.com/apache/servicecomb-service-center/server/syncernotify"
)
// InstanceEventHandler is the handler to handle events
@@ -69,9 +67,6 @@ func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
if action == discovery.EVT_INIT {
return
}
- if !syncernotify.GetSyncerNotifyCenter().Closed() {
- NotifySyncerInstanceEvent(evt, microService)
- }
consumerIDs, err := mongo.GetConsumerIDs(ctx, microService)
if err != nil {
log.Error(fmt.Sprintf("get service[%s][%s/%s/%s/%s]'s consumerIDs failed",
@@ -102,41 +97,3 @@ func PublishInstanceEvent(evt sd.MongoEvent, serviceKey *discovery.MicroServiceK
}
}
}
-
-func NotifySyncerInstanceEvent(event sd.MongoEvent, microService *discovery.MicroService) {
- instance := event.Value.(model.Instance).Instance
- log.Info(fmt.Sprintf("instanceId : %s and serviceId : %s in NotifySyncerInstanceEvent", instance.InstanceId, instance.ServiceId))
- instanceKey := util.StringJoin([]string{datasource.InstanceKeyPrefix, event.Value.(model.Instance).Domain,
- event.Value.(model.Instance).Project, instance.ServiceId, instance.InstanceId}, datasource.SPLIT)
-
- instanceKv := dump.KV{
- Key: instanceKey,
- Value: instance,
- }
-
- dumpInstance := dump.Instance{
- KV: &instanceKv,
- Value: instance,
- }
- serviceKey := util.StringJoin([]string{datasource.ServiceKeyPrefix, event.Value.(model.Instance).Domain,
- event.Value.(model.Instance).Project, instance.ServiceId}, datasource.SPLIT)
- serviceKv := dump.KV{
- Key: serviceKey,
- Value: microService,
- }
-
- dumpService := dump.Microservice{
- KV: &serviceKv,
- Value: microService,
- }
-
- instEvent := &dump.WatchInstanceChangedEvent{
- Action: string(event.Type),
- Service: &dumpService,
- Instance: &dumpInstance,
- }
- syncernotify.GetSyncerNotifyCenter().AddEvent(instEvent)
-
- log.Debug(fmt.Sprintf("success to add instance change event action [%s], instanceKey : %s to event queue",
- instEvent.Action, instanceKey))
-}
diff --git a/datasource/mongo/event/instance_event_handler_test.go b/datasource/mongo/event/instance_event_handler_test.go
index 076f57f..8e53077 100644
--- a/datasource/mongo/event/instance_event_handler_test.go
+++ b/datasource/mongo/event/instance_event_handler_test.go
@@ -28,7 +28,6 @@ import (
"github.com/apache/servicecomb-service-center/datasource/mongo/event"
"github.com/apache/servicecomb-service-center/datasource/mongo/model"
"github.com/apache/servicecomb-service-center/datasource/mongo/sd"
- "github.com/apache/servicecomb-service-center/server/syncernotify"
_ "github.com/apache/servicecomb-service-center/test"
)
@@ -49,21 +48,6 @@ func TestInstanceEventHandler_OnEvent(t *testing.T) {
h.OnEvent(mongoAssign())
assert.Error(t, assert.AnError)
})
- t.Run("OnEvent test when syncer notify center open", func(t *testing.T) {
- syncernotify.GetSyncerNotifyCenter().Start()
- h := event.InstanceEventHandler{}
- h.OnEvent(mongoAssign())
- assert.Equal(t, false, t.Failed(), "add event succeed")
- })
-}
-
-func TestNotifySyncerInstanceEvent(t *testing.T) {
- t.Run("test when data is ok", func(t *testing.T) {
- mongoEvent := mongoAssign()
- microService := getMicroService()
- event.NotifySyncerInstanceEvent(mongoEvent, microService)
- assert.Equal(t, false, t.Failed())
- })
}
func mongoAssign() sd.MongoEvent {
diff --git a/etc/conf/app.yaml b/etc/conf/app.yaml
index d0700ae..ed039f4 100644
--- a/etc/conf/app.yaml
+++ b/etc/conf/app.yaml
@@ -220,9 +220,6 @@ auth:
auditlog:
kind:
-syncer:
- enabled: false
-
heartbeat:
# configuration of websocket long connection
websocket:
diff --git a/eventbase/bootstrap/bootstrap.go b/eventbase/bootstrap/bootstrap.go
index 977089f..c304b12 100644
--- a/eventbase/bootstrap/bootstrap.go
+++ b/eventbase/bootstrap/bootstrap.go
@@ -18,6 +18,7 @@
package bootstrap
import (
+ // datasource implements
_ "github.com/apache/servicecomb-service-center/eventbase/datasource/etcd"
_ "github.com/apache/servicecomb-service-center/eventbase/datasource/mongo"
)
diff --git a/eventbase/datasource/mongo/task/task_dao.go b/eventbase/datasource/mongo/task/task_dao.go
index 8d6c180..bf3798e 100644
--- a/eventbase/datasource/mongo/task/task_dao.go
+++ b/eventbase/datasource/mongo/task/task_dao.go
@@ -65,10 +65,10 @@ func (d *Dao) Delete(ctx context.Context, tasks ...*sync.Task) error {
for i, task := range tasks {
tasksIDs[i] = task.ID
dFilter := bson.D{
- {model.ColumnDomain, task.Domain},
- {model.ColumnProject, task.Project},
- {model.ColumnID, task.ID},
- {model.ColumnTimestamp, task.Timestamp},
+ {Key: model.ColumnDomain, Value: task.Domain},
+ {Key: model.ColumnProject, Value: task.Project},
+ {Key: model.ColumnID, Value: task.ID},
+ {Key: model.ColumnTimestamp, Value: task.Timestamp},
}
filter = append(filter, dFilter)
}
diff --git a/eventbase/datasource/mongo/tombstone/tombstone_dao.go b/eventbase/datasource/mongo/tombstone/tombstone_dao.go
index 85621fc..891571f 100644
--- a/eventbase/datasource/mongo/tombstone/tombstone_dao.go
+++ b/eventbase/datasource/mongo/tombstone/tombstone_dao.go
@@ -68,10 +68,10 @@ func (d *Dao) Delete(ctx context.Context, tombstones ...*sync.Tombstone) error {
for i, tombstone := range tombstones {
tombstonesIDs[i] = tombstone.ResourceID
dFilter := bson.D{
- {model.ColumnResourceID, tombstone.ResourceID},
- {model.ColumnResourceType, tombstone.ResourceType},
- {model.ColumnDomain, tombstone.Domain},
- {model.ColumnProject, tombstone.Project},
+ {Key: model.ColumnResourceID, Value: tombstone.ResourceID},
+ {Key: model.ColumnResourceType, Value: tombstone.ResourceType},
+ {Key: model.ColumnDomain, Value: tombstone.Domain},
+ {Key: model.ColumnProject, Value: tombstone.Project},
}
filter = append(filter, dFilter)
}
diff --git a/server/bootstrap/bootstrap.go b/server/bootstrap/bootstrap.go
index 94153ab..46cb5b9 100644
--- a/server/bootstrap/bootstrap.go
+++ b/server/bootstrap/bootstrap.go
@@ -59,9 +59,6 @@ import (
//module 'admin'
_ "github.com/apache/servicecomb-service-center/server/rest/admin"
- //module 'syncer'
- _ "github.com/apache/servicecomb-service-center/server/rest/syncer"
-
//governance
_ "github.com/apache/servicecomb-service-center/server/service/gov/kie"
diff --git a/server/rest/syncer/service/service.go b/server/rest/syncer/service/service.go
deleted file mode 100644
index bf1c444..0000000
--- a/server/rest/syncer/service/service.go
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package service
-
-import (
- "context"
- "net/http"
-
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/server/syncernotify"
- "github.com/gorilla/websocket"
-)
-
-var (
- ServiceAPI = &Service{}
-)
-
-type Service struct {
-}
-
-func (service *Service) WatchInstance(w http.ResponseWriter, r *http.Request) {
- var upgrader = websocket.Upgrader{
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- }
- conn, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- log.Error("upgrade failed", err)
- return
- }
- defer conn.Close()
-
- syncernotify.DoWebSocketWatch(context.Background(), conn)
-}
diff --git a/server/rest/syncer/service/service_test.go b/server/rest/syncer/service/service_test.go
deleted file mode 100644
index ed31141..0000000
--- a/server/rest/syncer/service/service_test.go
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package service_test
-
-// initialize
-import (
- "context"
- "io"
- "net/http"
- "net/http/httptest"
- "os"
- "path/filepath"
- "testing"
-
- "github.com/apache/servicecomb-service-center/pkg/util"
- "github.com/apache/servicecomb-service-center/server/config"
- "github.com/apache/servicecomb-service-center/server/rest/syncer"
- "github.com/apache/servicecomb-service-center/server/rest/syncer/service"
- _ "github.com/apache/servicecomb-service-center/test"
- "github.com/go-chassis/go-archaius"
- "github.com/stretchr/testify/assert"
-)
-
-type mockSyncerHandler struct {
- Func func(w http.ResponseWriter, r *http.Request)
-}
-
-func (m *mockSyncerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- m.Func(w, r)
-}
-
-func TestSyncerService_WatchInstance(t *testing.T) {
- //create fake config file for test
- defer archaius.Clean()
- b := []byte(`
-syncer:
-enabled: true
-`)
- dir := filepath.Join(util.GetAppRoot(), "conf")
- defer os.Remove(dir)
- os.Mkdir(dir, 0750)
- file := filepath.Join(dir, "app.yaml")
- defer os.Remove(file)
- f1, err := os.Create(file)
- assert.NoError(t, err)
- _, err = io.WriteString(f1, string(b))
- assert.NoError(t, err)
- config.Init()
- assert.NoError(t, err)
-
- svr := httptest.NewServer(&mockSyncerHandler{func(w http.ResponseWriter, r *http.Request) {
- ctrl := &syncer.Controller{}
- ctrl.WatchInstance(w, r.WithContext(getContext()))
- }})
- defer svr.Close()
-
- // error situation test
- w := httptest.NewRecorder()
- r := &http.Request{}
- service.ServiceAPI.WatchInstance(w, r.WithContext(getContext()))
-}
-
-func getContext() context.Context {
- return util.SetContext(
- util.SetDomainProject(context.Background(), "default", "default"),
- util.CtxNocache, "1")
-}
diff --git a/server/rest/syncer/syncer.go b/server/rest/syncer/syncer.go
deleted file mode 100644
index 6812168..0000000
--- a/server/rest/syncer/syncer.go
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package syncer
-
-import (
- roa "github.com/apache/servicecomb-service-center/pkg/rest"
- "github.com/apache/servicecomb-service-center/server/config"
-)
-
-func init() {
- registerREST()
-}
-
-func registerREST() {
- syncerEnabled := config.GetBool("syncer.enabled", false)
- if syncerEnabled {
- roa.RegisterServant(&Controller{})
- }
-}
diff --git a/server/rest/syncer/syncer_controller.go b/server/rest/syncer/syncer_controller.go
deleted file mode 100644
index 6d6ba6f..0000000
--- a/server/rest/syncer/syncer_controller.go
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package syncer
-
-import (
- "net/http"
-
- "github.com/apache/servicecomb-service-center/server/rest/syncer/service"
-
- "github.com/apache/servicecomb-service-center/pkg/rest"
-)
-
-// Syncer 有关的接口
-type Controller struct {
-}
-
-// URLPatterns 路由
-func (ctrl *Controller) URLPatterns() []rest.Route {
- return []rest.Route{
- {Method: http.MethodGet, Path: "/v4/syncer/watch", Func: ctrl.WatchInstance},
- }
-}
-
-func (ctrl *Controller) WatchInstance(w http.ResponseWriter, r *http.Request) {
- service.ServiceAPI.WatchInstance(w, r)
-}
diff --git a/server/server.go b/server/server.go
index a26703f..fe7672c 100644
--- a/server/server.go
+++ b/server/server.go
@@ -34,7 +34,6 @@ import (
"github.com/apache/servicecomb-service-center/server/plugin/security/tlsconf"
"github.com/apache/servicecomb-service-center/server/service/gov"
"github.com/apache/servicecomb-service-center/server/service/rbac"
- snf "github.com/apache/servicecomb-service-center/server/syncernotify"
"github.com/go-chassis/foundation/gopool"
"github.com/little-cui/etcdadpt"
)
@@ -54,10 +53,9 @@ type endpoint struct {
}
type ServiceCenterServer struct {
- Endpoint endpoint
- APIServer *APIServer
- eventCenter *nf.BusService
- syncerNotifyService *snf.Service
+ Endpoint endpoint
+ APIServer *APIServer
+ eventCenter *nf.BusService
}
func (s *ServiceCenterServer) Run() {
@@ -87,7 +85,6 @@ func (s *ServiceCenterServer) initialize() {
s.initDatasource()
s.APIServer = GetAPIServer()
s.eventCenter = event.Center()
- s.syncerNotifyService = snf.GetSyncerNotifyCenter()
}
func (s *ServiceCenterServer) initEndpoints() {
@@ -167,12 +164,6 @@ func (s *ServiceCenterServer) startServices() {
// notifications
s.eventCenter.Start()
- // notify syncer
- syncerEnabled := config.GetBool("syncer.enabled", false)
- if syncerEnabled {
- s.syncerNotifyService.Start()
- }
-
// load server plugins
plugin.LoadPlugins()
rbac.Init()
@@ -203,10 +194,6 @@ func (s *ServiceCenterServer) Stop() {
s.eventCenter.Stop()
}
- if s.syncerNotifyService != nil {
- s.syncerNotifyService.Stop()
- }
-
gopool.CloseAndWait()
log.Warn("service center stopped")
diff --git a/server/syncernotify/common.go b/server/syncernotify/common.go
deleted file mode 100644
index b92c89a..0000000
--- a/server/syncernotify/common.go
+++ /dev/null
@@ -1,26 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package syncernotify
-
-import "time"
-
-const (
- HeartbeatInterval = 30 * time.Second
- ReadTimeout = HeartbeatInterval * 4
- SendTimeout = 5 * time.Second
- InstanceEventQueueSize = 5000
- ReadMaxBody = 64
-)
diff --git a/server/syncernotify/notify_service.go b/server/syncernotify/notify_service.go
deleted file mode 100644
index 8cbba01..0000000
--- a/server/syncernotify/notify_service.go
+++ /dev/null
@@ -1,83 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package syncernotify
-
-import (
- "fmt"
- "sync"
-
- pb "github.com/apache/servicecomb-service-center/pkg/dump"
- "github.com/apache/servicecomb-service-center/pkg/log"
-)
-
-var syncerNotifyService *Service
-
-func init() {
- syncerNotifyService = NewSyncerNotifyService()
-}
-
-func GetSyncerNotifyCenter() *Service {
- return syncerNotifyService
-}
-
-type Service struct {
- instEventCh chan *pb.WatchInstanceChangedEvent
- mux sync.RWMutex
- isClose bool
-}
-
-func NewSyncerNotifyService() *Service {
- return &Service{
- instEventCh: make(chan *pb.WatchInstanceChangedEvent, InstanceEventQueueSize),
- isClose: true,
- }
-}
-
-func (s *Service) AddEvent(event *pb.WatchInstanceChangedEvent) {
- s.instEventCh <- event
- log.Debug(fmt.Sprintf("add instance event to instance event channel, instEventCh len is: %d", len(s.instEventCh)))
-}
-
-func (s *Service) Start() {
- if !s.Closed() {
- log.Warn("syncer notify service is already running")
- return
- }
-
- s.mux.Lock()
- s.isClose = false
- s.mux.Unlock()
-
- log.Debug("syncer notify service is started")
-}
-
-func (s *Service) Closed() (b bool) {
- s.mux.RLock()
- b = s.isClose
- s.mux.RUnlock()
- return b
-}
-
-func (s *Service) Stop() {
- if s.Closed() {
- return
- }
- s.mux.Lock()
- s.isClose = true
- s.mux.Unlock()
-
- log.Debug("syncer notify service stopped")
-}
diff --git a/server/syncernotify/syncer_publisher.go b/server/syncernotify/syncer_publisher.go
deleted file mode 100644
index 6e14c1f..0000000
--- a/server/syncernotify/syncer_publisher.go
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package syncernotify
-
-import (
- "context"
- "fmt"
- "time"
-
- "github.com/apache/servicecomb-service-center/pkg/goutil"
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/server/alarm"
- "github.com/go-chassis/foundation/gopool"
-)
-
-var publisher *Publisher
-
-func init() {
- publisher = NewPublisher()
- publisher.Run()
-}
-
-type Publisher struct {
- ws *WebSocket
- goroutine *gopool.Pool
-}
-
-func (wh *Publisher) Run() {
- gopool.Go(publisher.loop)
-}
-
-func (wh *Publisher) Stop() {
- wh.goroutine.Close(true)
-}
-
-func (wh *Publisher) dispatch(ws *WebSocket, payload interface{}) {
- wh.goroutine.Do(func(ctx context.Context) {
- ws.HandleWatchWebSocketJob(payload)
- })
-}
-
-func (wh *Publisher) loop(ctx context.Context) {
- defer wh.Stop()
- ticker := time.NewTicker(500 * time.Millisecond)
- for {
- select {
- case <-ctx.Done():
- // server shutdown
- return
- case <-ticker.C:
- wsIsActive := true
- if wh.ws != nil {
- if payload := wh.ws.Pick(); payload != nil {
- if _, ok := payload.(error); ok {
- wsIsActive = false
- }
- wh.dispatch(wh.ws, payload)
- }
- }
- if !wsIsActive {
- log.Debug(fmt.Sprintf("release websocket conn :%s", wh.ws.conn.RemoteAddr()))
-
- err := wh.ws.conn.Close()
- if err != nil {
- log.Error("conn close failed", err)
- }
-
- wh.ws = nil
-
- err = alarm.Raise(alarm.IDWebsocketOfScSyncerLost, alarm.AdditionalContext("%v", err))
- if err != nil {
- log.Error("alarm error", err)
- }
- }
- }
- }
-}
-
-func (wh *Publisher) Accept(ws *WebSocket) {
- log.Debug(fmt.Sprintf("get a new websocket:%s", ws.conn.RemoteAddr()))
- wh.ws = ws
-}
-
-func NewPublisher() *Publisher {
- return &Publisher{
- goroutine: goutil.New(),
- }
-}
-
-func Instance() *Publisher {
- return publisher
-}
diff --git a/server/syncernotify/websocket.go b/server/syncernotify/websocket.go
deleted file mode 100644
index 9522f3d..0000000
--- a/server/syncernotify/websocket.go
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package syncernotify
-
-import (
- "context"
- "encoding/json"
- "fmt"
- "time"
-
- pb "github.com/apache/servicecomb-service-center/pkg/dump"
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/util"
- "github.com/gorilla/websocket"
-)
-
-type WebSocket struct {
- ctx context.Context
- conn *websocket.Conn
- err error
- free chan struct{}
- closed chan struct{}
-}
-
-func DoWebSocketWatch(ctx context.Context, conn *websocket.Conn) {
- log.Debug("begin do websocket watch")
-
- socket := NewWebSocket(ctx, conn)
-
- process(socket)
-}
-
-func NewWebSocket(ctx context.Context, conn *websocket.Conn) *WebSocket {
- return &WebSocket{
- ctx: ctx,
- conn: conn,
- }
-}
-
-func process(socket *WebSocket) {
- socket.Init()
-
- socket.HandleWatchWebSocketControlMessage()
-
- socket.Stop()
-}
-
-func (wh *WebSocket) Init() {
- wh.free = make(chan struct{}, 1)
- wh.closed = make(chan struct{})
- wh.SetReady()
- remoteAddr := wh.conn.RemoteAddr().String()
-
- Instance().Accept(wh)
- log.Debug(fmt.Sprintf("start watching instance status, watcher[%s]", remoteAddr))
-}
-
-func (wh *WebSocket) Pick() interface{} {
- select {
- case <-wh.Ready():
- if wh.Err() != nil {
- return wh.Err()
- }
-
- select {
- case e := <-GetSyncerNotifyCenter().instEventCh:
- return e
- default:
- // reset if idle
- wh.SetReady()
- }
- default:
- }
- return nil
-}
-
-func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) {
- defer wh.SetReady()
-
- var (
- message []byte
- remoteAddr = wh.conn.RemoteAddr().String()
- )
-
- switch o := o.(type) {
- // error will be set in HandleWatchWebSocketControlMessage
- case error:
- log.Error(fmt.Sprintf("watcher[%s] catch an err,", remoteAddr), o)
-
- message = util.StringToBytesWithNoCopy(fmt.Sprintf("watcher catch an err: %s", o.Error()))
- // InstanceChangedEvent will be set in OnEvent
- case *pb.WatchInstanceChangedEvent:
- resp := o
-
- log.Info(fmt.Sprintf("event[%s] is coming in, watcher[%s]", resp.Action, remoteAddr))
-
- data, err := json.Marshal(resp)
- if err != nil {
- log.Error(fmt.Sprintf("watcher[%s] marshal response failed", remoteAddr), err)
- message = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", err.Error()))
- break
- }
- message = data
- default:
- log.Error(fmt.Sprintf("watcher[%s] unknown input %v", remoteAddr, o), nil)
- return
- }
-
- select {
- case <-wh.closed:
- return
- default:
- }
-
- err := wh.WriteMessage(message)
-
- if err != nil {
- log.Error(fmt.Sprintf("watcher[%s] catch an err", remoteAddr), err)
- }
-}
-
-func (wh *WebSocket) ReadTimeout() time.Duration {
- return ReadTimeout
-}
-
-func (wh *WebSocket) SendTimeout() time.Duration {
- return SendTimeout
-}
-
-func (wh *WebSocket) Heartbeat(messageType int) error {
- err := wh.conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout()))
- if err != nil {
- messageTypeName := "Ping"
- if messageType == websocket.PongMessage {
- messageTypeName = "Pong"
- }
- log.Error(fmt.Sprintf("fail to send '%s' to watcher[%s]", messageTypeName, wh.conn.RemoteAddr()), err)
- return err
- }
- return nil
-}
-
-func (wh *WebSocket) HandleWatchWebSocketControlMessage() {
- remoteAddr := wh.conn.RemoteAddr().String()
- // PING
- wh.conn.SetPingHandler(func(message string) error {
- defer func() {
- err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
- if err != nil {
- log.Error("", err)
- }
- }()
- log.Debug(fmt.Sprintf("received 'Ping' message '%s' from watcher[%s]", message, remoteAddr))
- return wh.Heartbeat(websocket.PongMessage)
- })
- // PONG
- wh.conn.SetPongHandler(func(message string) error {
- defer func() {
- err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
- if err != nil {
- log.Error("", err)
- }
- }()
- log.Debug(fmt.Sprintf("received 'Pong' message '%s' from watcher[%s]", message, remoteAddr))
- return nil
- })
- // CLOSE
- wh.conn.SetCloseHandler(func(code int, text string) error {
- log.Info(fmt.Sprintf("watcher[%s] active closed, code: %d, message: '%s'", remoteAddr, code, text))
- return wh.sendClose(code, text)
- })
-
- wh.conn.SetReadLimit(ReadMaxBody)
- err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
- if err != nil {
- log.Error("", err)
- }
- for {
- _, _, err := wh.conn.ReadMessage()
- if err != nil {
- // client close or conn broken
- wh.SetError(err)
- return
- }
- }
-}
-
-func (wh *WebSocket) sendClose(code int, text string) error {
- remoteAddr := wh.conn.RemoteAddr().String()
- var message []byte
- if code != websocket.CloseNoStatusReceived {
- message = websocket.FormatCloseMessage(code, text)
- }
- err := wh.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(wh.SendTimeout()))
- if err != nil {
- log.Error(fmt.Sprintf("watcher[%s] catch an err", remoteAddr), err)
- return err
- }
- return nil
-}
-
-func (wh *WebSocket) WriteMessage(message []byte) error {
- err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout()))
- if err != nil {
- return err
- }
- return wh.conn.WriteMessage(websocket.TextMessage, message)
-}
-
-func (wh *WebSocket) Ready() <-chan struct{} {
- return wh.free
-}
-
-func (wh *WebSocket) SetReady() {
- select {
- case wh.free <- struct{}{}:
- default:
- }
-}
-
-func (wh *WebSocket) Stop() {
- close(wh.closed)
-}
-
-func (wh *WebSocket) SetError(e error) {
- wh.err = e
-}
-
-func (wh *WebSocket) Err() error {
- return wh.err
-}
diff --git a/server/syncernotify/websocket_test.go b/server/syncernotify/websocket_test.go
deleted file mode 100644
index 9dd77d6..0000000
--- a/server/syncernotify/websocket_test.go
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package syncernotify_test
-
-// initialize
-import (
- "context"
- "errors"
- "net/http"
- "net/http/httptest"
- "strings"
- "testing"
- "time"
-
- "github.com/apache/servicecomb-service-center/pkg/dump"
- "github.com/apache/servicecomb-service-center/server/core"
- . "github.com/apache/servicecomb-service-center/server/syncernotify"
- "github.com/go-chassis/cari/discovery"
- "github.com/gorilla/websocket"
- "github.com/stretchr/testify/assert"
-)
-
-var closeCh = make(chan struct{})
-
-type watcherConn struct {
-}
-
-func init() {
- testing.Init()
- core.Initialize()
-}
-
-func (h *watcherConn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- var upgrader = websocket.Upgrader{}
- conn, _ := upgrader.Upgrade(w, r, nil)
- for {
- conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
- conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
- _, _, err := conn.ReadMessage()
- if err != nil {
- return
- }
- <-closeCh
- conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
- conn.Close()
- return
- }
-}
-
-func TestDoWebSocketWatch(t *testing.T) {
- s := httptest.NewServer(&watcherConn{})
-
- conn, _, _ := websocket.DefaultDialer.Dial(
- strings.Replace(s.URL, "http://", "ws://", 1), nil)
- //fmt.Print(conn)
- ws := NewWebSocket(context.Background(), conn)
- ws.Init()
-
- t.Run("start syncer center", func(t *testing.T) {
-
- GetSyncerNotifyCenter().Start()
-
- go DoWebSocketWatch(context.Background(), conn)
-
- go ws.HandleWatchWebSocketControlMessage()
-
- })
-
- t.Run("handle websocket job", func(t *testing.T) {
- // 1. add event to channel, handle event
- instanceEvent := &dump.WatchInstanceChangedEvent{
- Action: "CREATE",
- Service: &dump.Microservice{
- KV: &dump.KV{
- Key: "/cse-sr/ms/files/default/default/4042a6a3e5a2893698ae363ea99a69eb63fc51cd",
- Rev: 12,
- ClusterName: "clustername",
- },
- Value: &discovery.MicroService{
- ServiceId: "4042a6a3e5a2893698ae363ea99a69eb63fc51cd",
- AppId: "default",
- ServiceName: "TEST01",
- Version: "0.0.1",
- Level: "BACK",
- Schemas: []string{
- "servicecenter.grpc.api.ServiceCtrl",
- "servicecenter.grpc.api.ServiceInstanceCtrl",
- },
- },
- },
- Instance: &dump.Instance{
- KV: &dump.KV{
- Key: "/cse-sr/inst/files/default/default/4042a6a3e5a2893698ae363ea99a69eb63fc51cd/7a6be9f861a811e9b3f6fa163eca30e0",
- Rev: 21,
- ClusterName: "clustername",
- },
- Value: &discovery.MicroServiceInstance{
- InstanceId: "8e0fe4b961a811e981a6fa163e86b81a",
- ServiceId: "4042a6a3e5a2893698ae363ea99a69eb63fc51cd",
- Endpoints: []string{
- "rest://192.168.88.109:30100/",
- },
- HostName: "sunlisen",
- },
- },
- }
-
- GetSyncerNotifyCenter().AddEvent(instanceEvent)
-
- <-time.After(time.Second)
-
- // 2. handle unknown things
- ws.HandleWatchWebSocketJob(nil)
-
- // 3. handle error
- fakeErr := errors.New("fake error")
- ws.HandleWatchWebSocketJob(fakeErr)
-
- // 4. heartbeat
- err := ws.Heartbeat(websocket.PingMessage)
- assert.NoError(t, err)
- err = ws.Heartbeat(websocket.PongMessage)
- assert.NoError(t, err)
-
- closeCh <- struct{}{}
-
- <-time.After(time.Second)
-
- err = ws.Heartbeat(websocket.PingMessage)
- assert.Error(t, err)
- err = ws.Heartbeat(websocket.PongMessage)
- assert.Error(t, err)
-
- })
-
- Instance().Stop()
- GetSyncerNotifyCenter().Stop()
-}