You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/08/03 00:55:18 UTC
[servicecomb-service-center] branch master updated: Fix: Clean code
(#1120)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new f31c2a7 Fix: Clean code (#1120)
f31c2a7 is described below
commit f31c2a7291fac7ad26e58846785c7a41ac6c9f5e
Author: little-cui <su...@qq.com>
AuthorDate: Tue Aug 3 08:53:52 2021 +0800
Fix: Clean code (#1120)
---
datasource/dep.go | 1 -
datasource/etcd/dep.go | 4 -
datasource/etcd/event/instance_event_handler.go | 6 +-
datasource/etcd/event/tag_event_handler.go | 2 +-
datasource/mongo/dep.go | 4 -
datasource/mongo/event/instance_event_handler.go | 6 +-
server/api.go | 76 +++++---------
server/connection/grpc/stream.go | 83 ---------------
server/connection/grpc/stream_test.go | 60 -----------
server/event/instance_event.go | 12 +--
server/pubsub/pubsub.go | 58 +++++++++++
.../disco/watch_test.go => pubsub/pubsub_test.go} | 42 ++------
server/{connection => pubsub}/ws/broker.go | 0
server/{connection => pubsub}/ws/broker_test.go | 2 +-
server/{connection => pubsub}/ws/common.go | 0
server/{connection => pubsub}/ws/common_test.go | 6 +-
server/{connection => pubsub/ws}/connection.go | 2 +-
server/{connection => pubsub}/ws/health_check.go | 0
.../{connection => pubsub}/ws/health_check_test.go | 2 +-
server/{connection => pubsub}/ws/options.go | 8 +-
server/{connection => pubsub}/ws/websocket.go | 3 +-
server/{connection => pubsub}/ws/websocket_test.go | 14 +--
server/rest/controller/v4/instance_watcher.go | 7 +-
server/server.go | 35 +++----
server/service/disco/watch.go | 113 ---------------------
server/service/heartbeat/websocket.go | 13 ++-
26 files changed, 143 insertions(+), 416 deletions(-)
diff --git a/datasource/dep.go b/datasource/dep.go
index fb381c4..1be1222 100644
--- a/datasource/dep.go
+++ b/datasource/dep.go
@@ -28,6 +28,5 @@ type DependencyManager interface {
SearchProviderDependency(ctx context.Context, request *pb.GetDependenciesRequest) (*pb.GetProDependenciesResponse, error)
SearchConsumerDependency(ctx context.Context, request *pb.GetDependenciesRequest) (*pb.GetConDependenciesResponse, error)
AddOrUpdateDependencies(ctx context.Context, dependencyInfos []*pb.ConsumerDependency, override bool) (*pb.Response, error)
- DeleteDependency()
DependencyHandle(ctx context.Context) error
}
diff --git a/datasource/etcd/dep.go b/datasource/etcd/dep.go
index 9b5dc3d..88d5941 100644
--- a/datasource/etcd/dep.go
+++ b/datasource/etcd/dep.go
@@ -102,10 +102,6 @@ func (dm *DepManager) SearchConsumerDependency(ctx context.Context, request *pb.
}, nil
}
-func (dm *DepManager) DeleteDependency() {
- panic("implement me")
-}
-
func (dm *DepManager) DependencyHandle(ctx context.Context) error {
var dep *event.DependencyEventHandler
err := dep.Handle()
diff --git a/datasource/etcd/event/instance_event_handler.go b/datasource/etcd/event/instance_event_handler.go
index 93d1bd5..980206f 100644
--- a/datasource/etcd/event/instance_event_handler.go
+++ b/datasource/etcd/event/instance_event_handler.go
@@ -102,14 +102,14 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
return
}
- PublishInstanceEvent(evt, domainProject, pb.MicroServiceToKey(domainProject, ms), consumerIDs)
+ PublishInstanceEvent(evt, pb.MicroServiceToKey(domainProject, ms), consumerIDs)
}
func NewInstanceEventHandler() *InstanceEventHandler {
return &InstanceEventHandler{}
}
-func PublishInstanceEvent(evt sd.KvEvent, domainProject string, serviceKey *pb.MicroServiceKey, subscribers []string) {
+func PublishInstanceEvent(evt sd.KvEvent, serviceKey *pb.MicroServiceKey, subscribers []string) {
defer cache.FindInstances.Remove(serviceKey)
if len(subscribers) == 0 {
@@ -123,7 +123,7 @@ func PublishInstanceEvent(evt sd.KvEvent, domainProject string, serviceKey *pb.M
Instance: evt.KV.Value.(*pb.MicroServiceInstance),
}
for _, consumerID := range subscribers {
- evt := event.NewInstanceEventWithTime(consumerID, domainProject, evt.Revision, evt.CreateAt, response)
+ evt := event.NewInstanceEvent(consumerID, evt.Revision, evt.CreateAt, response)
err := event.Center().Fire(evt)
if err != nil {
log.Errorf(err, "publish event[%v] into channel failed", evt)
diff --git a/datasource/etcd/event/tag_event_handler.go b/datasource/etcd/event/tag_event_handler.go
index 17a2d31..65a402f 100644
--- a/datasource/etcd/event/tag_event_handler.go
+++ b/datasource/etcd/event/tag_event_handler.go
@@ -91,7 +91,7 @@ func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumer
}
providerKey := pb.MicroServiceToKey(domainProject, provider)
- PublishInstanceEvent(apt.KvEvent, domainProject, providerKey, []string{consumerID})
+ PublishInstanceEvent(apt.KvEvent, providerKey, []string{consumerID})
}
return nil
}
diff --git a/datasource/mongo/dep.go b/datasource/mongo/dep.go
index 5e17ddb..7216350 100644
--- a/datasource/mongo/dep.go
+++ b/datasource/mongo/dep.go
@@ -173,10 +173,6 @@ func (ds *DepManager) AddOrUpdateDependencies(ctx context.Context, dependencys [
return discovery.CreateResponse(discovery.ResponseSuccess, "Create dependency successfully."), nil
}
-func (ds *DepManager) DeleteDependency() {
- panic("implement me")
-}
-
func (ds *DepManager) DependencyHandle(ctx context.Context) (err error) {
return nil
}
diff --git a/datasource/mongo/event/instance_event_handler.go b/datasource/mongo/event/instance_event_handler.go
index fe80ba7..fa9925c 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -83,14 +83,14 @@ func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
providerID, microService.Environment, microService.AppId, microService.ServiceName, microService.Version), err)
return
}
- PublishInstanceEvent(evt, domainProject, discovery.MicroServiceToKey(domainProject, microService), consumerIDS)
+ PublishInstanceEvent(evt, discovery.MicroServiceToKey(domainProject, microService), consumerIDS)
}
func NewInstanceEventHandler() *InstanceEventHandler {
return &InstanceEventHandler{}
}
-func PublishInstanceEvent(evt sd.MongoEvent, domainProject string, serviceKey *discovery.MicroServiceKey, subscribers []string) {
+func PublishInstanceEvent(evt sd.MongoEvent, serviceKey *discovery.MicroServiceKey, subscribers []string) {
if len(subscribers) == 0 {
return
}
@@ -101,7 +101,7 @@ func PublishInstanceEvent(evt sd.MongoEvent, domainProject string, serviceKey *d
Instance: evt.Value.(model.Instance).Instance,
}
for _, consumerID := range subscribers {
- evt := event.NewInstanceEventWithTime(consumerID, domainProject, -1, simple.FromTime(time.Now()), response)
+ evt := event.NewInstanceEvent(consumerID, -1, simple.FromTime(time.Now()), response)
err := event.Center().Fire(evt)
if err != nil {
log.Error(fmt.Sprintf("publish event[%v] into channel failed", evt), err)
diff --git a/server/api.go b/server/api.go
index eb75d7a..0810428 100644
--- a/server/api.go
+++ b/server/api.go
@@ -21,7 +21,6 @@ import (
"context"
"fmt"
"net"
- "strconv"
"time"
"github.com/apache/servicecomb-service-center/server/service/disco"
@@ -53,34 +52,16 @@ func InitAPI() {
core.ServiceAPI = disco.AssembleResources()
}
-type APIType int64
-
-func (t APIType) String() string {
- switch t {
- case RPC:
- return "grpc" // support grpc
- case REST:
- return "rest"
- default:
- return "SCHEME" + strconv.Itoa(int(t))
- }
-}
-
type APIServer struct {
- Listeners map[APIType]string
+ Listeners []string
+ HTTPServer *rest.Server
- restSrv *rest.Server
isClose bool
forked bool
err chan error
goroutine *gopool.Pool
}
-const (
- RPC APIType = 0
- REST APIType = 1
-)
-
func (s *APIServer) Err() <-chan error {
return s.err
}
@@ -97,48 +78,43 @@ func (s *APIServer) MarkForked() {
s.forked = true
}
-func (s *APIServer) AddListener(t APIType, ip, port string) {
- if s.Listeners == nil {
- s.Listeners = map[APIType]string{}
- }
+func (s *APIServer) AddListener(ip, port string) {
if len(ip) == 0 {
return
}
- s.Listeners[t] = net.JoinHostPort(ip, port)
+ s.Listeners = append(s.Listeners, net.JoinHostPort(ip, port))
}
-func (s *APIServer) populateEndpoint(t APIType, ipPort string) {
+func (s *APIServer) populateEndpoint(ipPort string) {
if len(ipPort) == 0 {
return
}
- address := fmt.Sprintf("%s://%s/", t, ipPort)
+ address := fmt.Sprintf("rest://%s/", ipPort)
if config.GetSSL().SslEnabled {
address += "?sslEnabled=true"
}
core.Instance.Endpoints = append(core.Instance.Endpoints, address)
}
-func (s *APIServer) startRESTServer() (err error) {
- addr, ok := s.Listeners[REST]
- if !ok {
- return
- }
- s.restSrv, err = rs.NewServer(addr)
- if err != nil {
- return
- }
- log.Infof("listen address: %s://%s", REST, s.restSrv.Listener.Addr().String())
-
- s.populateEndpoint(REST, s.restSrv.Listener.Addr().String())
-
- s.goroutine.Do(func(_ context.Context) {
- err := s.restSrv.Serve()
- if s.isClose {
+func (s *APIServer) serve() (err error) {
+ for i, addr := range s.Listeners {
+ s.HTTPServer, err = rs.NewServer(addr)
+ if err != nil {
return
}
- log.Errorf(err, "error to start REST API server %s", addr)
- s.err <- err
- })
+ log.Infof("listen address[%d]: rest://%s", i, s.HTTPServer.Listener.Addr().String())
+
+ s.populateEndpoint(s.HTTPServer.Listener.Addr().String())
+
+ s.goroutine.Do(func(_ context.Context) {
+ err := s.HTTPServer.Serve()
+ if s.isClose {
+ return
+ }
+ log.Errorf(err, "error to serve %s", addr)
+ s.err <- err
+ })
+ }
return
}
@@ -150,7 +126,7 @@ func (s *APIServer) Start() {
core.Instance.Endpoints = nil
- err := s.startRESTServer()
+ err := s.serve()
if err != nil {
s.err <- err
return
@@ -179,8 +155,8 @@ func (s *APIServer) Stop() {
s.selfUnregister()
}
- if s.restSrv != nil {
- s.restSrv.Shutdown()
+ if s.HTTPServer != nil {
+ s.HTTPServer.Shutdown()
}
close(s.err)
diff --git a/server/connection/grpc/stream.go b/server/connection/grpc/stream.go
deleted file mode 100644
index b786c8f..0000000
--- a/server/connection/grpc/stream.go
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package grpc
-
-import (
- "context"
- "errors"
- "time"
-
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/proto"
- "github.com/apache/servicecomb-service-center/pkg/util"
- "github.com/apache/servicecomb-service-center/server/connection"
- "github.com/apache/servicecomb-service-center/server/event"
- "github.com/apache/servicecomb-service-center/server/metrics"
-)
-
-const GRPC = "gRPC"
-
-func Handle(watcher *event.InstanceSubscriber, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
- timer := time.NewTimer(connection.HeartbeatInterval)
- defer timer.Stop()
- for {
- select {
- case <-stream.Context().Done():
- return
- case <-timer.C:
- timer.Reset(connection.HeartbeatInterval)
- case job := <-watcher.Job:
- if job == nil {
- err = errors.New("channel is closed")
- log.Errorf(err, "watcher caught an exception, subject: %s, group: %s",
- watcher.Subject(), watcher.Group())
- return
- }
- if job.Response == nil {
- continue
- }
- resp := job.Response
- log.Infof("event is coming in, watcher, subject: %s, group: %s",
- watcher.Subject(), watcher.Group())
-
- err = stream.Send(resp)
- metrics.ReportPublishCompleted(job, err)
- if err != nil {
- log.Errorf(err, "send message error, subject: %s, group: %s",
- watcher.Subject(), watcher.Group())
- watcher.SetError(err)
- return
- }
- util.ResetTimer(timer, connection.HeartbeatInterval)
- }
- }
-}
-
-func Watch(ctx context.Context, serviceID string, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
- domainProject := util.ParseDomainProject(ctx)
- domain := util.ParseDomain(ctx)
- watcher := event.NewInstanceSubscriber(serviceID, domainProject)
- err = event.Center().AddSubscriber(watcher)
- if err != nil {
- return
- }
- metrics.ReportSubscriber(domain, GRPC, 1)
- err = Handle(watcher, stream)
- metrics.ReportSubscriber(domain, GRPC, -1)
- return
-}
diff --git a/server/connection/grpc/stream_test.go b/server/connection/grpc/stream_test.go
deleted file mode 100644
index 3bb7d78..0000000
--- a/server/connection/grpc/stream_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package grpc_test
-
-import (
- "context"
- "testing"
- "time"
-
- "github.com/apache/servicecomb-service-center/pkg/log"
- simple "github.com/apache/servicecomb-service-center/pkg/time"
- stream "github.com/apache/servicecomb-service-center/server/connection/grpc"
- "github.com/apache/servicecomb-service-center/server/event"
- pb "github.com/go-chassis/cari/discovery"
- "google.golang.org/grpc"
-)
-
-type grpcWatchServer struct {
- grpc.ServerStream
-}
-
-func (x *grpcWatchServer) Send(m *pb.WatchInstanceResponse) error {
- return nil
-}
-
-func (x *grpcWatchServer) Context() context.Context {
- return context.Background()
-}
-
-func TestHandleWatchJob(t *testing.T) {
- w := event.NewInstanceSubscriber("g", "s")
- w.Job <- nil
- err := stream.Handle(w, &grpcWatchServer{})
- if err == nil {
- t.Fatalf("TestHandleWatchJob failed")
- }
- w.Job <- event.NewInstanceEventWithTime("g", "s", 1, simple.FromTime(time.Now()), nil)
- w.Job <- nil
- stream.Handle(w, &grpcWatchServer{})
-}
-
-func TestDoStreamListAndWatch(t *testing.T) {
- defer log.Recover()
- err := stream.Watch(context.Background(), "s", nil)
- t.Fatal("TestDoStreamListAndWatch failed", err)
-}
diff --git a/server/event/instance_event.go b/server/event/instance_event.go
index 74f30fc..9266573 100644
--- a/server/event/instance_event.go
+++ b/server/event/instance_event.go
@@ -34,17 +34,9 @@ type InstanceEvent struct {
Response *pb.WatchInstanceResponse
}
-func NewInstanceEvent(serviceID, domainProject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent {
+func NewInstanceEvent(serviceID string, rev int64, createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
return &InstanceEvent{
- Event: event.NewEvent(INSTANCE, domainProject, serviceID),
- Revision: rev,
- Response: response,
- }
-}
-
-func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
- return &InstanceEvent{
- Event: event.NewEventWithTime(INSTANCE, domainProject, serviceID, createAt),
+ Event: event.NewEventWithTime(INSTANCE, response.Key.Tenant, serviceID, createAt),
Revision: rev,
Response: response,
}
diff --git a/server/pubsub/pubsub.go b/server/pubsub/pubsub.go
new file mode 100644
index 0000000..278387d
--- /dev/null
+++ b/server/pubsub/pubsub.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pubsub
+
+import (
+ "context"
+ "errors"
+
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/gorilla/websocket"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/pubsub/ws"
+)
+
+var ErrRequiredServiceID = errors.New("required the serviceID")
+
+// Watch listen the provider instance events by serviceID
+func Watch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
+ log.Infof("new a web socket watch with service[%s]", in.SelfServiceId)
+ if err := ExistService(ctx, in.SelfServiceId); err != nil {
+ ws.SendEstablishError(conn, err)
+ return
+ }
+ ws.Watch(ctx, in.SelfServiceId, conn)
+}
+func ExistService(ctx context.Context, selfServiceID string) error {
+ if len(selfServiceID) == 0 {
+ return ErrRequiredServiceID
+ }
+ resp, err := datasource.GetMetadataManager().ExistServiceByID(ctx, &pb.GetExistenceByIDRequest{
+ ServiceId: selfServiceID,
+ })
+ if err != nil {
+ log.Error("", err)
+ return err
+ }
+ if !resp.Exist {
+ return datasource.ErrServiceNotExists
+ }
+ return nil
+}
diff --git a/server/service/disco/watch_test.go b/server/pubsub/pubsub_test.go
similarity index 68%
rename from server/service/disco/watch_test.go
rename to server/pubsub/pubsub_test.go
index 46168d8..6edf1e8 100644
--- a/server/service/disco/watch_test.go
+++ b/server/pubsub/pubsub_test.go
@@ -14,37 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package disco_test
+package pubsub_test
import (
"context"
- "testing"
-
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/pubsub"
"github.com/apache/servicecomb-service-center/server/service/disco"
+ "testing"
pb "github.com/go-chassis/cari/discovery"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "google.golang.org/grpc"
)
-type grpcWatchServer struct {
- grpc.ServerStream
-}
-
-func (x *grpcWatchServer) Send(m *pb.WatchInstanceResponse) error {
- return nil
-}
-
-func (x *grpcWatchServer) Context() context.Context {
- return getContext()
+func getContext() context.Context {
+ return util.WithNoCache(util.SetDomainProject(context.Background(), "default", "default"))
}
func TestInstanceService_WebSocketWatch(t *testing.T) {
defer func() {
recover()
}()
- disco.WebSocketWatch(context.Background(), &pb.WatchInstanceRequest{}, nil)
+ pubsub.Watch(context.Background(), &pb.WatchInstanceRequest{}, nil)
}
var _ = Describe("'Instance' service", func() {
@@ -54,7 +46,7 @@ var _ = Describe("'Instance' service", func() {
)
It("should be passed", func() {
- respCreate, err := serviceResource.Create(getContext(), &pb.CreateServiceRequest{
+ respCreate, err := disco.RegisterService(getContext(), &pb.CreateServiceRequest{
Service: &pb.MicroService{
ServiceName: "service_name_watch",
AppId: "service_name_watch",
@@ -71,27 +63,15 @@ var _ = Describe("'Instance' service", func() {
Context("when request is invalid", func() {
It("should be failed", func() {
By("service does not exist")
- err := disco.WatchPreOpera(getContext(), &pb.WatchInstanceRequest{
- SelfServiceId: "-1",
- })
- Expect(err).NotTo(BeNil())
-
- err = disco.Watch(&pb.WatchInstanceRequest{
- SelfServiceId: "-1",
- }, &grpcWatchServer{})
+ err := pubsub.ExistService(getContext(), "-1")
Expect(err).NotTo(BeNil())
By("service id is empty")
- err = disco.WatchPreOpera(getContext(), &pb.WatchInstanceRequest{
- SelfServiceId: "",
- })
+ err = pubsub.ExistService(getContext(), "")
Expect(err).NotTo(BeNil())
By("request is valid")
- err = disco.WatchPreOpera(getContext(),
- &pb.WatchInstanceRequest{
- SelfServiceId: serviceId,
- })
+ err = pubsub.ExistService(getContext(), serviceId)
Expect(err).To(BeNil())
})
})
diff --git a/server/connection/ws/broker.go b/server/pubsub/ws/broker.go
similarity index 100%
rename from server/connection/ws/broker.go
rename to server/pubsub/ws/broker.go
diff --git a/server/connection/ws/broker_test.go b/server/pubsub/ws/broker_test.go
similarity index 95%
rename from server/connection/ws/broker_test.go
rename to server/pubsub/ws/broker_test.go
index 2c95501..55241fb 100644
--- a/server/connection/ws/broker_test.go
+++ b/server/pubsub/ws/broker_test.go
@@ -21,8 +21,8 @@ import (
"context"
"testing"
- "github.com/apache/servicecomb-service-center/server/connection/ws"
"github.com/apache/servicecomb-service-center/server/event"
+ "github.com/apache/servicecomb-service-center/server/pubsub/ws"
"github.com/stretchr/testify/assert"
)
diff --git a/server/connection/ws/common.go b/server/pubsub/ws/common.go
similarity index 100%
rename from server/connection/ws/common.go
rename to server/pubsub/ws/common.go
diff --git a/server/connection/ws/common_test.go b/server/pubsub/ws/common_test.go
similarity index 88%
rename from server/connection/ws/common_test.go
rename to server/pubsub/ws/common_test.go
index df8ae55..95656db 100644
--- a/server/connection/ws/common_test.go
+++ b/server/pubsub/ws/common_test.go
@@ -22,14 +22,14 @@ import (
"errors"
"testing"
- wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+ "github.com/apache/servicecomb-service-center/server/pubsub/ws"
"github.com/stretchr/testify/assert"
)
func TestSendEstablishError(t *testing.T) {
mock := NewTest()
t.Run("should read the err when call", func(t *testing.T) {
- wss.SendEstablishError(mock.ServerConn, errors.New("error"))
+ ws.SendEstablishError(mock.ServerConn, errors.New("error"))
_, message, err := mock.ClientConn.ReadMessage()
assert.Nil(t, err)
assert.Equal(t, "error", string(message))
@@ -42,6 +42,6 @@ func TestWatch(t *testing.T) {
mock.ServerConn.Close()
ctx, cancel := context.WithCancel(context.Background())
cancel()
- wss.Watch(ctx, "", mock.ServerConn)
+ ws.Watch(ctx, "", mock.ServerConn)
})
}
diff --git a/server/connection/connection.go b/server/pubsub/ws/connection.go
similarity index 98%
rename from server/connection/connection.go
rename to server/pubsub/ws/connection.go
index c34005f..fb198ca 100644
--- a/server/connection/connection.go
+++ b/server/pubsub/ws/connection.go
@@ -16,7 +16,7 @@
*/
// connection pkg impl the pub/sub mechanism of the long connection of diff protocols
-package connection
+package ws
import "time"
diff --git a/server/connection/ws/health_check.go b/server/pubsub/ws/health_check.go
similarity index 100%
rename from server/connection/ws/health_check.go
rename to server/pubsub/ws/health_check.go
diff --git a/server/connection/ws/health_check_test.go b/server/pubsub/ws/health_check_test.go
similarity index 95%
rename from server/connection/ws/health_check_test.go
rename to server/pubsub/ws/health_check_test.go
index 9928f3e..1dfbaf1 100644
--- a/server/connection/ws/health_check_test.go
+++ b/server/pubsub/ws/health_check_test.go
@@ -20,7 +20,7 @@ package ws_test
import (
"testing"
- "github.com/apache/servicecomb-service-center/server/connection/ws"
+ "github.com/apache/servicecomb-service-center/server/pubsub/ws"
"github.com/stretchr/testify/assert"
)
diff --git a/server/connection/ws/options.go b/server/pubsub/ws/options.go
similarity index 83%
rename from server/connection/ws/options.go
rename to server/pubsub/ws/options.go
index 3196623..a3f1cd7 100644
--- a/server/connection/ws/options.go
+++ b/server/pubsub/ws/options.go
@@ -19,8 +19,6 @@ package ws
import (
"time"
-
- "github.com/apache/servicecomb-service-center/server/connection"
)
type Options struct {
@@ -31,8 +29,8 @@ type Options struct {
func ToOptions() Options {
return Options{
- ReadTimeout: connection.ReadTimeout,
- SendTimeout: connection.SendTimeout,
- HealthInterval: connection.HeartbeatInterval,
+ ReadTimeout: ReadTimeout,
+ SendTimeout: SendTimeout,
+ HealthInterval: HeartbeatInterval,
}
}
diff --git a/server/connection/ws/websocket.go b/server/pubsub/ws/websocket.go
similarity index 97%
rename from server/connection/ws/websocket.go
rename to server/pubsub/ws/websocket.go
index 0014b8c..14482bd 100644
--- a/server/connection/ws/websocket.go
+++ b/server/pubsub/ws/websocket.go
@@ -25,7 +25,6 @@ import (
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
- "github.com/apache/servicecomb-service-center/server/connection"
pb "github.com/go-chassis/cari/discovery"
"github.com/gorilla/websocket"
)
@@ -98,7 +97,7 @@ func (wh *WebSocket) registerMessageHandler() {
}
func (wh *WebSocket) ReadMessage() error {
- wh.Conn.SetReadLimit(connection.ReadMaxBody)
+ wh.Conn.SetReadLimit(ReadMaxBody)
err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
if err != nil {
log.Error("", err)
diff --git a/server/connection/ws/websocket_test.go b/server/pubsub/ws/websocket_test.go
similarity index 92%
rename from server/connection/ws/websocket_test.go
rename to server/pubsub/ws/websocket_test.go
index cd4ad8b..deb1d19 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/pubsub/ws/websocket_test.go
@@ -26,9 +26,9 @@ import (
"testing"
"time"
- wss "github.com/apache/servicecomb-service-center/server/connection/ws"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/event"
+ "github.com/apache/servicecomb-service-center/server/pubsub/ws"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
)
@@ -81,15 +81,15 @@ func NewTest() *watcherConn {
func TestNewWebSocket(t *testing.T) {
mock := NewTest()
t.Run("should return not nil when new", func(t *testing.T) {
- assert.NotNil(t, wss.NewWebSocket("", "", mock.ServerConn))
+ assert.NotNil(t, ws.NewWebSocket("", "", mock.ServerConn))
})
}
func TestWebSocket_NeedCheck(t *testing.T) {
mock := NewTest()
conn := mock.ServerConn
- options := wss.ToOptions()
- webSocket := &wss.WebSocket{
+ options := ws.ToOptions()
+ webSocket := &ws.WebSocket{
Options: options,
DomainProject: "default",
ConsumerID: "",
@@ -119,7 +119,7 @@ func TestWebSocket_NeedCheck(t *testing.T) {
func TestWebSocket_Idle(t *testing.T) {
mock := NewTest()
- webSocket := wss.NewWebSocket("", "", mock.ServerConn)
+ webSocket := ws.NewWebSocket("", "", mock.ServerConn)
t.Run("should idle when new", func(t *testing.T) {
select {
@@ -156,13 +156,13 @@ func TestWebSocket_CheckHealth(t *testing.T) {
event.Center().Start()
t.Run("should do nothing when recv PING", func(t *testing.T) {
- ws := wss.NewWebSocket("", "", mock.ServerConn)
+ ws := ws.NewWebSocket("", "", mock.ServerConn)
mock.ClientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
<-time.After(time.Second)
assert.Nil(t, ws.CheckHealth(context.Background()))
})
t.Run("should return err when consumer not exist", func(t *testing.T) {
- ws := wss.NewWebSocket("", "", mock.ServerConn)
+ ws := ws.NewWebSocket("", "", mock.ServerConn)
assert.Equal(t, "service does not exist", ws.CheckHealth(context.Background()).Error())
})
}
diff --git a/server/rest/controller/v4/instance_watcher.go b/server/rest/controller/v4/instance_watcher.go
index b9f9bf8..24ddb4d 100644
--- a/server/rest/controller/v4/instance_watcher.go
+++ b/server/rest/controller/v4/instance_watcher.go
@@ -20,12 +20,11 @@ package v4
import (
"net/http"
- discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
- "github.com/apache/servicecomb-service-center/server/service/heartbeat"
-
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/rest"
"github.com/apache/servicecomb-service-center/server/handler/exception"
+ "github.com/apache/servicecomb-service-center/server/pubsub"
+ "github.com/apache/servicecomb-service-center/server/service/heartbeat"
pb "github.com/go-chassis/cari/discovery"
"github.com/gorilla/websocket"
)
@@ -72,7 +71,7 @@ func (s *WatchService) Watch(w http.ResponseWriter, r *http.Request) {
defer conn.Close()
r.Method = "WATCH"
- discosvc.WebSocketWatch(r.Context(), &pb.WatchInstanceRequest{
+ pubsub.Watch(r.Context(), &pb.WatchInstanceRequest{
SelfServiceId: r.URL.Query().Get(":serviceId"),
}, conn)
}
diff --git a/server/server.go b/server/server.go
index 87b430c..4be3771 100644
--- a/server/server.go
+++ b/server/server.go
@@ -60,10 +60,8 @@ type endpoint struct {
}
type ServiceCenterServer struct {
- REST endpoint
- GRPC endpoint
-
- apiService *APIServer
+ Endpoint endpoint
+ APIServer *APIServer
eventCenter *nf.BusService
syncerNotifyService *snf.Service
}
@@ -79,7 +77,7 @@ func (s *ServiceCenterServer) Run() {
}
func (s *ServiceCenterServer) waitForQuit() {
- err := <-s.apiService.Err()
+ err := <-s.APIServer.Err()
if err != nil {
log.Errorf(err, "service center catch errors")
}
@@ -95,16 +93,14 @@ func (s *ServiceCenterServer) initialize() {
s.initSSL()
// Datasource
s.initDatasource()
- s.apiService = GetAPIServer()
+ s.APIServer = GetAPIServer()
s.eventCenter = event.Center()
s.syncerNotifyService = snf.GetSyncerNotifyCenter()
}
func (s *ServiceCenterServer) initEndpoints() {
- s.REST.Host = config.GetString("server.host", "", config.WithStandby("httpaddr"))
- s.REST.Port = config.GetString("server.port", "", config.WithStandby("httpport"))
- s.GRPC.Host = config.GetString("server.rpc.host", "", config.WithStandby("rpcaddr"))
- s.GRPC.Port = config.GetString("server.rpc.port", "", config.WithStandby("rpcport"))
+ s.Endpoint.Host = config.GetString("server.host", "", config.WithStandby("httpaddr"))
+ s.Endpoint.Port = config.GetString("server.port", "", config.WithStandby("httpport"))
}
func (s *ServiceCenterServer) initDatasource() {
@@ -138,14 +134,10 @@ func (s *ServiceCenterServer) initMetrics() {
interval = defaultCollectPeriod
}
var instance string
- if len(s.REST.Host) > 0 {
- instance = net.JoinHostPort(s.REST.Host, s.REST.Port)
+ if len(s.Endpoint.Host) > 0 {
+ instance = net.JoinHostPort(s.Endpoint.Host, s.Endpoint.Port)
} else {
- if len(s.GRPC.Host) > 0 {
- instance = net.JoinHostPort(s.GRPC.Host, s.GRPC.Port)
- } else {
- log.Fatal("init metrics InstanceName failed", nil)
- }
+ log.Fatal("init metrics InstanceName failed", nil)
}
if err := metrics.Init(metrics.Options{
@@ -207,14 +199,13 @@ func (s *ServiceCenterServer) startServices() {
func (s *ServiceCenterServer) startAPIService() {
core.Instance.HostName = util.HostName()
- s.apiService.AddListener(REST, s.REST.Host, s.REST.Port)
- s.apiService.AddListener(RPC, s.REST.Host, s.GRPC.Port)
- s.apiService.Start()
+ s.APIServer.AddListener(s.Endpoint.Host, s.Endpoint.Port)
+ s.APIServer.Start()
}
func (s *ServiceCenterServer) Stop() {
- if s.apiService != nil {
- s.apiService.Stop()
+ if s.APIServer != nil {
+ s.APIServer.Stop()
}
if s.eventCenter != nil {
diff --git a/server/service/disco/watch.go b/server/service/disco/watch.go
deleted file mode 100644
index 8870816..0000000
--- a/server/service/disco/watch.go
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package disco
-
-import (
- "context"
- "errors"
- "fmt"
-
- pb "github.com/go-chassis/cari/discovery"
- "github.com/gorilla/websocket"
-
- "github.com/apache/servicecomb-service-center/datasource"
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/proto"
- "github.com/apache/servicecomb-service-center/server/connection/grpc"
- "github.com/apache/servicecomb-service-center/server/connection/ws"
-)
-
-func WatchPreOpera(ctx context.Context, in *pb.WatchInstanceRequest) error {
- if in == nil || len(in.SelfServiceId) == 0 {
- return errors.New("request format invalid")
- }
- resp, err := datasource.GetMetadataManager().ExistServiceByID(ctx, &pb.GetExistenceByIDRequest{
- ServiceId: in.SelfServiceId,
- })
- if err != nil {
- log.Error("", err)
- return err
- }
- if !resp.Exist {
- return datasource.ErrServiceNotExists
- }
- return nil
-}
-
-func Watch(in *pb.WatchInstanceRequest, stream proto.ServiceInstanceCtrlWatchServer) error {
- log.Infof("new a stream list and watch with service[%s]", in.SelfServiceId)
- if err := WatchPreOpera(stream.Context(), in); err != nil {
- log.Errorf(err, "service[%s] establish watch failed: invalid params", in.SelfServiceId)
- return err
- }
-
- return grpc.Watch(stream.Context(), in.SelfServiceId, stream)
-}
-
-func WebSocketWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
- log.Infof("new a web socket watch with service[%s]", in.SelfServiceId)
- if err := WatchPreOpera(ctx, in); err != nil {
- ws.SendEstablishError(conn, err)
- return
- }
- ws.Watch(ctx, in.SelfServiceId, conn)
-}
-
-func QueryAllProvidersInstances(ctx context.Context, in *pb.WatchInstanceRequest) ([]*pb.WatchInstanceResponse, int64) {
- depResp, err := datasource.GetDependencyManager().SearchConsumerDependency(ctx, &pb.GetDependenciesRequest{
- ServiceId: in.SelfServiceId,
- })
- if err != nil {
- log.Error(fmt.Sprintf("search service[%s] dependencies failed", in.SelfServiceId), err)
- return nil, 0
- }
- if depResp.Response.GetCode() != pb.ResponseSuccess {
- log.Error(fmt.Sprintf("search service[%s] dependencies failed. %s",
- in.SelfServiceId, depResp.Response.GetMessage()), nil)
- return nil, 0
- }
- var results []*pb.WatchInstanceResponse
- for _, provider := range depResp.Providers {
- instResp, err := datasource.GetMetadataManager().GetInstances(ctx, &pb.GetInstancesRequest{
- ProviderServiceId: provider.ServiceId,
- })
- if err != nil {
- log.Error(fmt.Sprintf("get service[%s] instances failed", in.SelfServiceId), err)
- return nil, 0
- }
- if instResp.Response.GetCode() != pb.ResponseSuccess {
- log.Error(fmt.Sprintf("get service[%s] instances failed. %s",
- in.SelfServiceId, instResp.Response.GetMessage()), nil)
- return nil, 0
- }
- for _, instance := range instResp.Instances {
- results = append(results, &pb.WatchInstanceResponse{
- Response: pb.CreateResponse(pb.ResponseSuccess, "List instance successfully."),
- Action: string(pb.EVT_INIT),
- Key: &pb.MicroServiceKey{
- Environment: provider.Environment,
- AppId: provider.AppId,
- ServiceName: provider.ServiceName,
- Version: provider.Version,
- },
- Instance: instance,
- })
- }
- }
- return results, 0
-}
diff --git a/server/service/heartbeat/websocket.go b/server/service/heartbeat/websocket.go
index 263ae2e..71cde2e 100644
--- a/server/service/heartbeat/websocket.go
+++ b/server/service/heartbeat/websocket.go
@@ -24,17 +24,16 @@ import (
"sync"
"time"
- "github.com/apache/servicecomb-service-center/datasource"
- discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
-
pb "github.com/go-chassis/cari/discovery"
"github.com/gorilla/websocket"
+ "github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
- "github.com/apache/servicecomb-service-center/server/connection"
"github.com/apache/servicecomb-service-center/server/metrics"
+ "github.com/apache/servicecomb-service-center/server/pubsub/ws"
+ discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
)
const (
@@ -81,7 +80,7 @@ func (c *client) sendClose(code int, text string) error {
if code != websocket.CloseNoStatusReceived {
message = websocket.FormatCloseMessage(code, text)
}
- err := c.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(connection.SendTimeout))
+ err := c.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(ws.SendTimeout))
if err != nil {
log.Error(fmt.Sprintf("watcher[%s] catch an err", remoteAddr), err)
return err
@@ -98,7 +97,7 @@ func (c *client) heartbeat() {
}()
for {
<-ticker.C
- err := c.conn.SetWriteDeadline(time.Now().Add(connection.SendTimeout))
+ err := c.conn.SetWriteDeadline(time.Now().Add(ws.SendTimeout))
if err != nil {
log.Error("", err)
}
@@ -116,7 +115,7 @@ func (c *client) handleMessage() {
remoteAddr := c.conn.RemoteAddr().String()
c.conn.SetPongHandler(func(message string) error {
- err := c.conn.SetReadDeadline(time.Now().Add(connection.ReadTimeout))
+ err := c.conn.SetReadDeadline(time.Now().Add(ws.ReadTimeout))
if err != nil {
log.Error("", err)
}