You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/05/20 01:45:30 UTC
[servicecomb-service-center] branch master updated: SCB-2176
Refactor websocket (#986)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new cc0f684 SCB-2176 Refactor websocket (#986)
cc0f684 is described below
commit cc0f684449f6606c45c86d53508428a50eb32b84
Author: little-cui <su...@qq.com>
AuthorDate: Thu May 20 09:45:23 2021 +0800
SCB-2176 Refactor websocket (#986)
* SCB-2176 Refactor event bus
* SCB-2176 Fix: UT failures
* SCB-2176 Refactor websocket (#980)
* SCB-2176 Refactor websocket
* SCB-2176 Add UTs
* SCB-2176 Fix: UT failures
* SCB-2176 Fix: websocket health check failed
* SCB-2176 Add benchmark for 10K connection
* SCB-2176 Add benchmark for 10K connection
* SCB-2176 Add event metrics
---
datasource/etcd/event/instance_event_handler.go | 29 +-
datasource/etcd/sd/etcd/cacher_kv.go | 1 +
datasource/etcd/sd/metrics.go | 32 +++
datasource/etcd/sd/servicecenter/common.go | 30 +-
datasource/etcd/util/microservice_util.go | 7 -
docs/user-guides/metrics.md | 4 +
examples/infrastructures/docker/README.md | 2 +-
integration/apis.go | 1 -
integration/health-metrics-grafana.json | 174 ++++++++----
integration/instances_test.go | 133 ++++++++-
pkg/event/bus.go | 6 +-
pkg/event/subscriber.go | 5 +-
pkg/proto/service_ex.go | 1 -
pkg/util/context.go | 14 +-
server/connection/grpc/stream.go | 7 +-
server/connection/grpc/stream_test.go | 4 +-
server/connection/ws/broker.go | 81 ++++++
.../connection/ws/broker_test.go | 28 +-
server/connection/ws/common.go | 68 +++++
.../connection/ws/common_test.go | 33 ++-
.../ws/{publisher.go => health_check.go} | 87 +++---
.../ws/health_check_test.go} | 28 +-
.../ws/options.go} | 23 +-
server/connection/ws/websocket.go | 307 +++++++--------------
server/connection/ws/websocket_test.go | 174 +++++++-----
server/event/instance_event.go | 51 ++++
server/event/instance_subscriber.go | 131 +++------
server/metrics/connection.go | 26 +-
server/rest/controller/v3/instance_watcher.go | 1 -
server/rest/controller/v4/instance_watcher.go | 14 -
server/service/watch.go | 15 +-
server/service/watch_test.go | 7 -
32 files changed, 904 insertions(+), 620 deletions(-)
diff --git a/datasource/etcd/event/instance_event_handler.go b/datasource/etcd/event/instance_event_handler.go
index 7297b7c..b247c2a 100644
--- a/datasource/etcd/event/instance_event_handler.go
+++ b/datasource/etcd/event/instance_event_handler.go
@@ -22,8 +22,6 @@ import (
"fmt"
"strings"
- pb "github.com/go-chassis/cari/discovery"
-
"github.com/apache/servicecomb-service-center/datasource/etcd/cache"
"github.com/apache/servicecomb-service-center/datasource/etcd/kv"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
@@ -36,6 +34,7 @@ import (
"github.com/apache/servicecomb-service-center/server/event"
"github.com/apache/servicecomb-service-center/server/metrics"
"github.com/apache/servicecomb-service-center/server/syncernotify"
+ pb "github.com/go-chassis/cari/discovery"
)
const (
@@ -65,12 +64,13 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
domainName := domainProject[:idx]
projectName := domainProject[idx+1:]
+ ctx := util.WithGlobal(util.WithCacheOnly(context.Background()))
+
var count float64 = increaseOne
- switch action {
- case pb.EVT_INIT:
+ if action == pb.EVT_INIT {
metrics.ReportInstances(domainName, count)
- ms := serviceUtil.GetServiceFromCache(domainProject, providerID)
- if ms == nil {
+ ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
+ if err != nil {
log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, get cached provider's file failed",
action, providerID, providerInstanceID, instance.Endpoints)
return
@@ -78,11 +78,10 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
frameworkName, frameworkVersion := getFramework(ms)
metrics.ReportFramework(domainName, projectName, frameworkName, frameworkVersion, count)
return
- case pb.EVT_CREATE:
- metrics.ReportInstances(domainName, count)
- case pb.EVT_DELETE:
+ }
+
+ if action == pb.EVT_DELETE {
count = decreaseOne
- metrics.ReportInstances(domainName, count)
if !core.IsDefaultDomainProject(domainProject) {
projectName := domainProject[idx+1:]
serviceUtil.RemandInstanceQuota(
@@ -91,9 +90,7 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
}
// 查询服务版本信息
- ctx := util.WithGlobal(util.WithCacheOnly(context.Background()))
ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
-
if err != nil {
log.Error(fmt.Sprintf("caught [%s] instance[%s/%s] event, endpoints %v, get cached provider's file failed",
action, providerID, providerInstanceID, instance.Endpoints), err)
@@ -110,8 +107,11 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
return
}
- frameworkName, frameworkVersion := getFramework(ms)
- metrics.ReportFramework(domainName, projectName, frameworkName, frameworkVersion, count)
+ if action != pb.EVT_UPDATE {
+ frameworkName, frameworkVersion := getFramework(ms)
+ metrics.ReportInstances(domainName, count)
+ metrics.ReportFramework(domainName, projectName, frameworkName, frameworkVersion, count)
+ }
log.Infof("caught [%s] service[%s][%s/%s/%s/%s] instance[%s] event, endpoints %v",
action, providerID, ms.Environment, ms.AppId, ms.ServiceName, ms.Version,
@@ -146,7 +146,6 @@ func PublishInstanceEvent(evt sd.KvEvent, domainProject string, serviceKey *pb.M
Instance: evt.KV.Value.(*pb.MicroServiceInstance),
}
for _, consumerID := range subscribers {
- // TODO add超时怎么处理?
evt := event.NewInstanceEventWithTime(consumerID, domainProject, evt.Revision, evt.CreateAt, response)
err := event.Center().Fire(evt)
if err != nil {
diff --git a/datasource/etcd/sd/etcd/cacher_kv.go b/datasource/etcd/sd/etcd/cacher_kv.go
index 11eb70a..4606fab 100644
--- a/datasource/etcd/sd/etcd/cacher_kv.go
+++ b/datasource/etcd/sd/etcd/cacher_kv.go
@@ -454,6 +454,7 @@ func (c *KvCacher) notify(evts []sd.KvEvent) {
for _, evt := range evts {
c.Cfg.OnEvent(evt)
}
+ sd.ReportDispatchEventCompleted(c.Cfg.Key, evts)
}
func (c *KvCacher) doParse(src *sdcommon.Resource) (kv *sd.KeyValue) {
diff --git a/datasource/etcd/sd/metrics.go b/datasource/etcd/sd/metrics.go
index 82e38c0..f25e30b 100644
--- a/datasource/etcd/sd/metrics.go
+++ b/datasource/etcd/sd/metrics.go
@@ -43,6 +43,23 @@ var (
Help: "Latency of backend events processing",
Objectives: metrics.Pxx,
}, []string{"instance", "prefix"})
+
+ dispatchCounter = helper.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: metrics.FamilyName,
+ Subsystem: "db",
+ Name: "dispatch_event_total",
+ Help: "Counter of backend events dispatch",
+ }, []string{"instance", "prefix"})
+
+ dispatchLatency = helper.NewSummaryVec(
+ prometheus.SummaryOpts{
+ Namespace: metrics.FamilyName,
+ Subsystem: "db",
+ Name: "dispatch_event_durations_microseconds",
+ Help: "Latency of backend events dispatch",
+ Objectives: metrics.Pxx,
+ }, []string{"instance", "prefix"})
)
func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
@@ -57,4 +74,19 @@ func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
eventsLatency.WithLabelValues(instance, prefix).Observe(elapsed)
}
eventsCounter.WithLabelValues(instance, prefix).Add(l)
+ dispatchCounter.WithLabelValues(instance, prefix).Add(l)
+}
+
+func ReportDispatchEventCompleted(prefix string, evts []KvEvent) {
+ l := float64(len(evts))
+ if l == 0 {
+ return
+ }
+ instance := metrics.InstanceName()
+ now := time.Now()
+ for _, evt := range evts {
+ elapsed := float64(now.Sub(evt.CreateAt.Local()).Nanoseconds()) / float64(time.Microsecond)
+ dispatchLatency.WithLabelValues(instance, prefix).Observe(elapsed)
+ }
+ dispatchCounter.WithLabelValues(instance, prefix).Add(-l)
}
diff --git a/datasource/etcd/sd/servicecenter/common.go b/datasource/etcd/sd/servicecenter/common.go
index d59957f..a107db1 100644
--- a/datasource/etcd/sd/servicecenter/common.go
+++ b/datasource/etcd/sd/servicecenter/common.go
@@ -1,17 +1,19 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package servicecenter
diff --git a/datasource/etcd/util/microservice_util.go b/datasource/etcd/util/microservice_util.go
index d7c30ef..6c45a06 100644
--- a/datasource/etcd/util/microservice_util.go
+++ b/datasource/etcd/util/microservice_util.go
@@ -64,13 +64,6 @@ func GetService(ctx context.Context, domainProject string, serviceID string) (*p
return serviceResp.Kvs[0].Value.(*pb.MicroService), nil
}
-// GetServiceFromCache gets service from cache
-func GetServiceFromCache(domainProject string, serviceID string) *pb.MicroService {
- ctx := util.WithGlobal(util.WithCacheOnly(context.Background()))
- svc, _ := GetService(ctx, domainProject, serviceID)
- return svc
-}
-
func getServicesRawData(ctx context.Context, domainProject string) ([]*sd.KeyValue, error) {
key := path.GenerateServiceKey(domainProject, "")
opts := append(FromContext(ctx),
diff --git a/docs/user-guides/metrics.md b/docs/user-guides/metrics.md
index e3ead16..f815756 100644
--- a/docs/user-guides/metrics.md
+++ b/docs/user-guides/metrics.md
@@ -11,6 +11,8 @@
### Pub/Sub
1. notify_publish_total
1. notify_publish_durations_microseconds
+1. notify_pending_total
+1. notify_pending_durations_microseconds
1. notify_subscriber_total
### Meta
@@ -25,6 +27,8 @@
### Backend
1. db_backend_event_total
1. db_backend_event_durations_microseconds
+1. db_dispatch_event_total
+1. db_dispatch_event_durations_microseconds
1. db_backend_operation_total
1. db_backend_operation_durations_microseconds
1. db_backend_total
diff --git a/examples/infrastructures/docker/README.md b/examples/infrastructures/docker/README.md
index 1955c64..90c3b72 100644
--- a/examples/infrastructures/docker/README.md
+++ b/examples/infrastructures/docker/README.md
@@ -5,7 +5,7 @@ A simple demo to deploy ServiceCenter in docker environment.
## Quick Start
```bash
-cd $PROJECT_ROOT/integration/docker
+cd examples/infrastructures/docker
docker-compose up
```
This will start up ServiceCenter listening on `:30100` for handling requests and Dashboard listening on `:30103`.
diff --git a/integration/apis.go b/integration/apis.go
index d263a6b..f8aa164 100644
--- a/integration/apis.go
+++ b/integration/apis.go
@@ -47,7 +47,6 @@ var UPDATEINSTANCEMETADATA = "/v4/default/registry/microservices/:serviceId/inst
var UPDATEINSTANCESTATUS = "/v4/default/registry/microservices/:serviceId/instances/:instanceId/status"
var INSTANCEHEARTBEAT = "/v4/default/registry/microservices/:serviceId/instances/:instanceId/heartbeat"
var INSTANCEWATCHER = "/v4/default/registry/microservices/:serviceId/watcher"
-var INSTANCELISTWATCHER = "/v4/default/registry/microservices/:serviceId/listwatcher"
//Governance API's
var GETGOVERNANCESERVICEDETAILS = "/v4/default/govern/microservices/:serviceId"
diff --git a/integration/health-metrics-grafana.json b/integration/health-metrics-grafana.json
index f730b2c..84f963b 100644
--- a/integration/health-metrics-grafana.json
+++ b/integration/health-metrics-grafana.json
@@ -1,18 +1,3 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License
-
{
"__inputs": [
{
@@ -2183,7 +2168,7 @@
"steppedLine": false,
"targets": [
{
- "expr": "sum(irate(service_center_db_backend_event_total{job=\"service-center\"}[1m])) by (instance,prefix)",
+ "expr": "sum(irate(service_center_db_dispatch_event_total{job=\"service-center\"}[1m])) by (instance,prefix)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{instance}}> {{prefix}}",
@@ -2193,7 +2178,7 @@
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Publish Events",
+ "title": "Produce Events",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2277,7 +2262,7 @@
"steppedLine": false,
"targets": [
{
- "expr": "max(avg_over_time(service_center_db_backend_event_durations_microseconds{job=\"service-center\"}[1m])) by (instance, prefix)",
+ "expr": "max(avg_over_time(service_center_db_dispatch_event_durations_microseconds{job=\"service-center\"}[1m])) by (instance, prefix)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{instance}}> {{prefix}}",
@@ -2287,7 +2272,7 @@
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Publish Events Latency",
+ "title": "Produce Events Latency",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2381,7 +2366,7 @@
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Subscribe Events",
+ "title": "Post Events",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2475,7 +2460,7 @@
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Subscribe Events Latency",
+ "title": "Post Events Latency",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2526,8 +2511,91 @@
"x": 0,
"y": 40
},
+ "id": 48,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": false,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "sum(service_center_notify_pending_total{job=\"service-center\"}) by (instance,source)",
+ "format": "time_series",
+ "intervalFactor": 1,
+ "refId": "A"
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Pending Events",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_LOCAL}",
+ "fill": 1,
+ "gridPos": {
+ "h": 6,
+ "w": 6,
+ "x": 6,
+ "y": 40
+ },
"height": "",
- "id": 37,
+ "id": 43,
"legend": {
"alignAsTable": false,
"avg": false,
@@ -2558,18 +2626,17 @@
"steppedLine": false,
"targets": [
{
- "expr": "sum(irate(service_center_db_backend_operation_total{job=\"service-center\"}[1m])) by (instance,operation)",
+ "expr": "service_center_notify_subscriber_total{job=\"service-center\"}",
"format": "time_series",
- "instant": false,
"intervalFactor": 1,
- "legendFormat": "{{instance}}> {{operation}}",
- "refId": "A"
+ "legendFormat": "{{instance}}> {{domain}} {{scheme}}",
+ "refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Backend Operations",
+ "title": "Subscribers",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2587,7 +2654,7 @@
"yaxes": [
{
"decimals": 0,
- "format": "ops",
+ "format": "none",
"label": null,
"logBase": 1,
"max": null,
@@ -2618,25 +2685,25 @@
"gridPos": {
"h": 6,
"w": 6,
- "x": 6,
+ "x": 12,
"y": 40
},
"height": "",
- "id": 38,
+ "id": 37,
"legend": {
"alignAsTable": false,
- "avg": true,
+ "avg": false,
"current": false,
"hideEmpty": true,
"hideZero": true,
- "max": true,
- "min": true,
- "rightSide": true,
+ "max": false,
+ "min": false,
+ "rightSide": false,
"show": false,
"sort": "avg",
"sortDesc": true,
"total": false,
- "values": true
+ "values": false
},
"lines": true,
"linewidth": 1,
@@ -2653,17 +2720,18 @@
"steppedLine": false,
"targets": [
{
- "expr": "max(avg_over_time(service_center_db_backend_operation_durations_microseconds{job=\"service-center\"}[1m])) by (instance,operation)",
+ "expr": "sum(irate(service_center_db_backend_operation_total{job=\"service-center\"}[1m])) by (instance,operation)",
"format": "time_series",
+ "instant": false,
"intervalFactor": 1,
"legendFormat": "{{instance}}> {{operation}}",
- "refId": "B"
+ "refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Backend Operations Latency",
+ "title": "Backend Operations",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2680,7 +2748,8 @@
},
"yaxes": [
{
- "format": "µs",
+ "decimals": 0,
+ "format": "ops",
"label": null,
"logBase": 1,
"max": null,
@@ -2711,25 +2780,25 @@
"gridPos": {
"h": 6,
"w": 6,
- "x": 12,
+ "x": 18,
"y": 40
},
"height": "",
- "id": 43,
+ "id": 38,
"legend": {
"alignAsTable": false,
- "avg": false,
+ "avg": true,
"current": false,
"hideEmpty": true,
"hideZero": true,
- "max": false,
- "min": false,
- "rightSide": false,
+ "max": true,
+ "min": true,
+ "rightSide": true,
"show": false,
"sort": "avg",
"sortDesc": true,
"total": false,
- "values": false
+ "values": true
},
"lines": true,
"linewidth": 1,
@@ -2746,17 +2815,17 @@
"steppedLine": false,
"targets": [
{
- "expr": "service_center_notify_subscriber_total{job=\"service-center\"}",
+ "expr": "max(avg_over_time(service_center_db_backend_operation_durations_microseconds{job=\"service-center\"}[1m])) by (instance,operation)",
"format": "time_series",
"intervalFactor": 1,
- "legendFormat": "{{instance}}> {{domain}} {{scheme}}",
+ "legendFormat": "{{instance}}> {{operation}}",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Subscribers",
+ "title": "Backend Operations Latency",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2773,8 +2842,7 @@
},
"yaxes": [
{
- "decimals": 0,
- "format": "none",
+ "format": "µs",
"label": null,
"logBase": 1,
"max": null,
@@ -3434,7 +3502,7 @@
"list": []
},
"time": {
- "from": "now-5m",
+ "from": "now-30m",
"to": "now"
},
"timepicker": {
@@ -3465,5 +3533,5 @@
"timezone": "",
"title": "ServiceCenter",
"uid": "Zg6NoHGiz",
- "version": 12
+ "version": 17
}
\ No newline at end of file
diff --git a/integration/instances_test.go b/integration/instances_test.go
index d4fd6b0..1f24fc7 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -18,8 +18,13 @@ package integrationtest_test
import (
"encoding/json"
+ "fmt"
+ "github.com/go-chassis/cari/discovery"
+ "github.com/gorilla/websocket"
+ "github.com/stretchr/testify/assert"
"net/http"
"strings"
+ "sync"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -601,18 +606,6 @@ var _ = Describe("MicroService Api Test", func() {
Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
})
- It("Call the listwatcher API ", func() {
- //This api gives 400 bad request for the integration test
- // as integration test is not able to make ws connection
- url := strings.Replace(INSTANCELISTWATCHER, ":serviceId", serviceId, 1)
- req, _ := http.NewRequest(GET, SCURL+url, nil)
- req.Header.Set("X-Domain-Name", "default")
- resp, err := scclient.Do(req)
- Expect(err).To(BeNil())
- defer resp.Body.Close()
-
- Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
- })
})
})
@@ -705,3 +698,119 @@ func BenchmarkRegisterMicroServiceInstance(b *testing.B) {
Expect(resp.StatusCode).To(Equal(http.StatusOK))
}
}
+
+func BenchmarkInstanceWatch(t *testing.B) {
+ scclient = insecurityConnection
+ var serviceId, instanceId string
+
+ t.Run("prepare data", func(t *testing.B) {
+ // service
+ serviceName := "testInstance" + strconv.Itoa(rand.Int())
+ servicemap := map[string]interface{}{
+ "serviceName": serviceName,
+ "appId": "testApp",
+ "version": "1.0",
+ }
+ bodyParams := map[string]interface{}{
+ "service": servicemap,
+ }
+ body, _ := json.Marshal(bodyParams)
+ bodyBuf := bytes.NewReader(body)
+ req, _ := http.NewRequest(POST, SCURL+REGISTERMICROSERVICE, bodyBuf)
+ req.Header.Set("X-Domain-Name", "default")
+ resp, err := scclient.Do(req)
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusOK, resp.StatusCode)
+ respbody, _ := ioutil.ReadAll(resp.Body)
+ serviceId = gojson.Json(string(respbody)).Get("serviceId").Tostring()
+ resp.Body.Close()
+
+ // instance
+ healthcheck := map[string]interface{}{
+ "mode": "push",
+ "interval": 30000,
+ "times": 20000,
+ }
+ instance := map[string]interface{}{
+ "hostName": "cse",
+ "healthCheck": healthcheck,
+ }
+ bodyParams = map[string]interface{}{
+ "instance": instance,
+ }
+ body, _ = json.Marshal(bodyParams)
+ bodyBuf = bytes.NewReader(body)
+ req, _ = http.NewRequest(POST, SCURL+strings.Replace(REGISTERINSTANCE, ":serviceId", serviceId, 1), bodyBuf)
+ req.Header.Set("X-Domain-Name", "default")
+ resp, err = scclient.Do(req)
+ assert.NoError(t, err)
+ respbody, _ = ioutil.ReadAll(resp.Body)
+ instanceId = gojson.Json(string(respbody)).Get("instanceId").Tostring()
+ resp.Body.Close()
+
+ req, _ = http.NewRequest(GET, SCURL+FINDINSTANCE+"?appId=testApp&serviceName="+serviceName+"&version=1.0", nil)
+ req.Header.Set("X-Domain-Name", "default")
+ req.Header.Set("X-ConsumerId", serviceId)
+ resp, err = scclient.Do(req)
+ assert.NoError(t, err)
+ resp.Body.Close()
+
+ assert.Equal(t, http.StatusOK, resp.StatusCode)
+ })
+
+ t.Run("test 10K connection", func(t *testing.B) {
+
+ const N, E = 2500, 2500
+ var okWg sync.WaitGroup
+ okWg.Add(N)
+
+ t.Run("new 10K connection", func(t *testing.B) {
+ url := strings.ReplaceAll(strings.ReplaceAll(SCURL, "http://", "ws://")+INSTANCEWATCHER, ":serviceId", serviceId)
+ for i := 0; i < N; i++ {
+ go func() {
+ conn, _, err := websocket.DefaultDialer.Dial(url, nil)
+ assert.NoError(t, err)
+ for {
+ _ = conn.SetReadDeadline(time.Now().Add(30 * time.Second))
+ _, data, err := conn.ReadMessage()
+ if err != nil {
+ okWg.Done()
+ return
+ }
+
+ var response discovery.WatchInstanceResponse
+ _ = json.Unmarshal(data, &response)
+ instance := response.Instance
+ timestamp, _ := strconv.ParseInt(instance.ModTimestamp, 10, 64)
+ sub := time.Now().Sub(time.Unix(timestamp, 0))
+ fmt.Println(instance.Properties["tag"], sub)
+ }
+ }()
+ }
+ <-time.After(10 * time.Second)
+ })
+
+ t.Run("fire 10K event", func(t *testing.B) {
+ for i := 0; i < E; i++ {
+ propertiesInstance := map[string]interface{}{
+ "tag": strconv.Itoa(i),
+ }
+ bodyParams := map[string]interface{}{
+ "properties": propertiesInstance,
+ }
+ url := strings.Replace(UPDATEINSTANCEMETADATA, ":serviceId", serviceId, 1)
+ url = strings.Replace(url, ":instanceId", instanceId, 1)
+ body, _ := json.Marshal(bodyParams)
+ bodyBuf := bytes.NewReader(body)
+ req, _ := http.NewRequest(UPDATE, SCURL+url, bodyBuf)
+ req.Header.Set("X-Domain-Name", "default")
+ resp, _ := scclient.Do(req)
+ resp.Body.Close()
+ }
+ })
+
+ t.Run("wait", func(t *testing.B) {
+ okWg.Wait()
+ })
+ })
+}
diff --git a/pkg/event/bus.go b/pkg/event/bus.go
index a573381..4e7fe77 100644
--- a/pkg/event/bus.go
+++ b/pkg/event/bus.go
@@ -41,14 +41,14 @@ func (bus *Bus) Fire(evt Event) {
bus.Add(queue.Task{Payload: evt})
}
-func (bus *Bus) Handle(ctx context.Context, evt interface{}) {
- bus.fireAtOnce(evt.(Event))
+func (bus *Bus) Handle(ctx context.Context, payload interface{}) {
+ bus.fireAtOnce(payload.(Event))
}
func (bus *Bus) fireAtOnce(evt Event) {
if itf, ok := bus.subjects.Get(evt.Subject()); ok {
itf.(*Poster).Post(evt)
- }
+ } // else the evt will be discard
}
func (bus *Bus) Subjects(name string) *Poster {
diff --git a/pkg/event/subscriber.go b/pkg/event/subscriber.go
index b26060b..df771dd 100644
--- a/pkg/event/subscriber.go
+++ b/pkg/event/subscriber.go
@@ -31,12 +31,15 @@ type Subscriber interface {
Bus() *BusService
SetBus(*BusService)
+ // Err event bus remove subscriber automatically, if return not nil.
+ // Implement of OnMessage should call SetError when run exception
Err() error
SetError(err error)
Close()
+ // OnAccept call when subscriber appended in event bus successfully
OnAccept()
- // The event bus will callback this function, so it must be non-blocked.
+ // OnMessage call when event bus fire a msg, it must be non-blocked
OnMessage(Event)
}
diff --git a/pkg/proto/service_ex.go b/pkg/proto/service_ex.go
index ce955d3..64c0b9e 100644
--- a/pkg/proto/service_ex.go
+++ b/pkg/proto/service_ex.go
@@ -30,7 +30,6 @@ type ServiceInstanceCtrlServerEx interface {
BatchFind(ctx context.Context, in *discovery.BatchFindInstancesRequest) (*discovery.BatchFindInstancesResponse, error)
WebSocketWatch(ctx context.Context, in *discovery.WatchInstanceRequest, conn *websocket.Conn)
- WebSocketListAndWatch(ctx context.Context, in *discovery.WatchInstanceRequest, conn *websocket.Conn)
ClusterHealth(ctx context.Context) (*discovery.GetInstancesResponse, error)
}
diff --git a/pkg/util/context.go b/pkg/util/context.go
index b77bbc2..2f5b621 100644
--- a/pkg/util/context.go
+++ b/pkg/util/context.go
@@ -20,6 +20,7 @@ package util
import (
"context"
"net/http"
+ "strings"
"time"
)
@@ -28,6 +29,7 @@ const (
CtxProject CtxKey = "project"
CtxTargetDomain CtxKey = "target-domain"
CtxTargetProject CtxKey = "target-project"
+ SPLIT = "/"
)
type StringContext struct {
@@ -119,11 +121,11 @@ func SetRequestContext(r *http.Request, key CtxKey, val interface{}) *http.Reque
}
func ParseDomainProject(ctx context.Context) string {
- return ParseDomain(ctx) + "/" + ParseProject(ctx)
+ return ParseDomain(ctx) + SPLIT + ParseProject(ctx)
}
func ParseTargetDomainProject(ctx context.Context) string {
- return ParseTargetDomain(ctx) + "/" + ParseTargetProject(ctx)
+ return ParseTargetDomain(ctx) + SPLIT + ParseTargetProject(ctx)
}
func ParseDomain(ctx context.Context) string {
@@ -178,6 +180,14 @@ func SetDomainProject(ctx context.Context, domain string, project string) contex
return SetProject(SetDomain(ctx, domain), project)
}
+func SetDomainProjectString(ctx context.Context, domainProject string) context.Context {
+ arr := strings.Split(domainProject, SPLIT)
+ if len(arr) != 2 {
+ return ctx
+ }
+ return SetProject(SetDomain(ctx, arr[0]), arr[1])
+}
+
func SetTargetDomainProject(ctx context.Context, domain string, project string) context.Context {
return SetTargetProject(SetTargetDomain(ctx, domain), project)
}
diff --git a/server/connection/grpc/stream.go b/server/connection/grpc/stream.go
index b74acc3..b786c8f 100644
--- a/server/connection/grpc/stream.go
+++ b/server/connection/grpc/stream.go
@@ -28,12 +28,11 @@ import (
"github.com/apache/servicecomb-service-center/server/connection"
"github.com/apache/servicecomb-service-center/server/event"
"github.com/apache/servicecomb-service-center/server/metrics"
- pb "github.com/go-chassis/cari/discovery"
)
const GRPC = "gRPC"
-func Handle(watcher *event.InstanceEventListWatcher, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
+func Handle(watcher *event.InstanceSubscriber, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
timer := time.NewTimer(connection.HeartbeatInterval)
defer timer.Stop()
for {
@@ -69,10 +68,10 @@ func Handle(watcher *event.InstanceEventListWatcher, stream proto.ServiceInstanc
}
}
-func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), stream proto.ServiceInstanceCtrlWatchServer) (err error) {
+func Watch(ctx context.Context, serviceID string, stream proto.ServiceInstanceCtrlWatchServer) (err error) {
domainProject := util.ParseDomainProject(ctx)
domain := util.ParseDomain(ctx)
- watcher := event.NewInstanceEventListWatcher(serviceID, domainProject, f)
+ watcher := event.NewInstanceSubscriber(serviceID, domainProject)
err = event.Center().AddSubscriber(watcher)
if err != nil {
return
diff --git a/server/connection/grpc/stream_test.go b/server/connection/grpc/stream_test.go
index 3381167..3bb7d78 100644
--- a/server/connection/grpc/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -42,7 +42,7 @@ func (x *grpcWatchServer) Context() context.Context {
}
func TestHandleWatchJob(t *testing.T) {
- w := event.NewInstanceEventListWatcher("g", "s", nil)
+ w := event.NewInstanceSubscriber("g", "s")
w.Job <- nil
err := stream.Handle(w, &grpcWatchServer{})
if err == nil {
@@ -55,6 +55,6 @@ func TestHandleWatchJob(t *testing.T) {
func TestDoStreamListAndWatch(t *testing.T) {
defer log.Recover()
- err := stream.ListAndWatch(context.Background(), "s", nil, nil)
+ err := stream.Watch(context.Background(), "s", nil)
t.Fatal("TestDoStreamListAndWatch failed", err)
}
diff --git a/server/connection/ws/broker.go b/server/connection/ws/broker.go
new file mode 100644
index 0000000..1501945
--- /dev/null
+++ b/server/connection/ws/broker.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/event"
+ "github.com/apache/servicecomb-service-center/server/metrics"
+ pb "github.com/go-chassis/cari/discovery"
+)
+
+var errChanClosed = fmt.Errorf("chan closed")
+
+type Broker struct {
+ consumer *WebSocket
+ producer *event.InstanceSubscriber
+}
+
+func (b *Broker) Listen(ctx context.Context) error {
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case instanceEvent, ok := <-b.producer.Job:
+ if !ok {
+ return errChanClosed
+ }
+ err := b.write(instanceEvent)
+ if err != nil {
+ return err
+ }
+ }
+ }
+}
+func (b *Broker) write(evt *event.InstanceEvent) error {
+ resp := evt.Response
+ providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
+ if resp.Action != string(pb.EVT_EXPIRE) {
+ providerFlag = fmt.Sprintf("%s/%s(%s)", resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
+ }
+ remoteAddr := b.consumer.Conn.RemoteAddr().String()
+ log.Infof("event[%s] is coming in, subscriber[%s] watch %s, group: %s",
+ resp.Action, remoteAddr, providerFlag, b.producer.Group())
+
+ resp.Response = nil
+ data, err := json.Marshal(resp)
+ if err != nil {
+ log.Errorf(err, "subscriber[%s] watch %s, group: %s", remoteAddr, providerFlag, b.producer.Group())
+ data = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", err.Error()))
+ }
+ err = b.consumer.WriteTextMessage(data)
+ metrics.ReportPublishCompleted(evt, err)
+ return err
+}
+
+func NewBroker(ws *WebSocket, is *event.InstanceSubscriber) *Broker {
+ return &Broker{
+ consumer: ws,
+ producer: is,
+ }
+}
diff --git a/pkg/proto/service_ex.go b/server/connection/ws/broker_test.go
similarity index 55%
copy from pkg/proto/service_ex.go
copy to server/connection/ws/broker_test.go
index ce955d3..a3597c3 100644
--- a/pkg/proto/service_ex.go
+++ b/server/connection/ws/broker_test.go
@@ -15,22 +15,28 @@
* limitations under the License.
*/
-package proto
+package ws_test
import (
"context"
-
- "github.com/go-chassis/cari/discovery"
- "github.com/gorilla/websocket"
+ "github.com/apache/servicecomb-service-center/server/connection/ws"
+ "github.com/apache/servicecomb-service-center/server/event"
+ "github.com/stretchr/testify/assert"
+ "testing"
)
-type ServiceInstanceCtrlServerEx interface {
- ServiceInstanceCtrlServer
-
- BatchFind(ctx context.Context, in *discovery.BatchFindInstancesRequest) (*discovery.BatchFindInstancesResponse, error)
+func TestNewBroker(t *testing.T) {
+ t.Run("should not return nil when new broker", func(t *testing.T) {
+ assert.NotNil(t, ws.NewBroker(nil, nil))
- WebSocketWatch(ctx context.Context, in *discovery.WatchInstanceRequest, conn *websocket.Conn)
- WebSocketListAndWatch(ctx context.Context, in *discovery.WatchInstanceRequest, conn *websocket.Conn)
+ })
+}
- ClusterHealth(ctx context.Context) (*discovery.GetInstancesResponse, error)
+func TestBroker_Listen(t *testing.T) {
+ t.Run("should return err when listen context cancelled", func(t *testing.T) {
+ broker := ws.NewBroker(nil, event.NewInstanceSubscriber("", ""))
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ assert.Equal(t, context.Canceled, broker.Listen(ctx))
+ })
}
diff --git a/server/connection/ws/common.go b/server/connection/ws/common.go
new file mode 100644
index 0000000..d9e709c
--- /dev/null
+++ b/server/connection/ws/common.go
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/event"
+ "github.com/apache/servicecomb-service-center/server/metrics"
+ "github.com/gorilla/websocket"
+)
+
+func Watch(ctx context.Context, serviceID string, conn *websocket.Conn) {
+ domainProject := util.ParseDomainProject(ctx)
+ domain := util.ParseDomain(ctx)
+
+ ws := NewWebSocket(domainProject, serviceID, conn)
+ HealthChecker().Accept(ws)
+
+ subscriber := event.NewInstanceSubscriber(serviceID, domainProject)
+ err := event.Center().AddSubscriber(subscriber)
+ if err != nil {
+ SendEstablishError(conn, err)
+ return
+ }
+
+ metrics.ReportSubscriber(domain, Websocket, 1)
+ defer metrics.ReportSubscriber(domain, Websocket, -1)
+
+ pool := gopool.New(ctx).Do(func(ctx context.Context) {
+ if err := NewBroker(ws, subscriber).Listen(ctx); err != nil {
+ log.Error(fmt.Sprintf("[%s] listen service[%s] failed", conn.RemoteAddr(), serviceID), err)
+ }
+ })
+ defer pool.Done()
+
+ if err := ws.ReadMessage(); err != nil {
+ log.Error(fmt.Sprintf("read subscriber[%s][%s] message failed", serviceID, conn.RemoteAddr()), err)
+ subscriber.SetError(err)
+ }
+}
+
+func SendEstablishError(conn *websocket.Conn, err error) {
+ remoteAddr := conn.RemoteAddr().String()
+ log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
+ if err := conn.WriteMessage(websocket.TextMessage, util.StringToBytesWithNoCopy(err.Error())); err != nil {
+ log.Errorf(err, "establish[%s] websocket watch failed: write message failed.", remoteAddr)
+ }
+}
diff --git a/pkg/proto/service_ex.go b/server/connection/ws/common_test.go
similarity index 53%
copy from pkg/proto/service_ex.go
copy to server/connection/ws/common_test.go
index ce955d3..df8ae55 100644
--- a/pkg/proto/service_ex.go
+++ b/server/connection/ws/common_test.go
@@ -15,22 +15,33 @@
* limitations under the License.
*/
-package proto
+package ws_test
import (
"context"
+ "errors"
+ "testing"
- "github.com/go-chassis/cari/discovery"
- "github.com/gorilla/websocket"
+ wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+ "github.com/stretchr/testify/assert"
)
-type ServiceInstanceCtrlServerEx interface {
- ServiceInstanceCtrlServer
-
- BatchFind(ctx context.Context, in *discovery.BatchFindInstancesRequest) (*discovery.BatchFindInstancesResponse, error)
-
- WebSocketWatch(ctx context.Context, in *discovery.WatchInstanceRequest, conn *websocket.Conn)
- WebSocketListAndWatch(ctx context.Context, in *discovery.WatchInstanceRequest, conn *websocket.Conn)
+func TestSendEstablishError(t *testing.T) {
+ mock := NewTest()
+ t.Run("should read the err when call", func(t *testing.T) {
+ wss.SendEstablishError(mock.ServerConn, errors.New("error"))
+ _, message, err := mock.ClientConn.ReadMessage()
+ assert.Nil(t, err)
+ assert.Equal(t, "error", string(message))
+ })
+}
- ClusterHealth(ctx context.Context) (*discovery.GetInstancesResponse, error)
+func TestWatch(t *testing.T) {
+ t.Run("should return when ctx cancelled", func(t *testing.T) {
+ mock := NewTest()
+ mock.ServerConn.Close()
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ wss.Watch(ctx, "", mock.ServerConn)
+ })
}
diff --git a/server/connection/ws/publisher.go b/server/connection/ws/health_check.go
similarity index 57%
rename from server/connection/ws/publisher.go
rename to server/connection/ws/health_check.go
index 267ff24..aeb4f38 100644
--- a/server/connection/ws/publisher.go
+++ b/server/connection/ws/health_check.go
@@ -19,40 +19,36 @@ package ws
import (
"context"
+ "fmt"
"sync"
"time"
"github.com/apache/servicecomb-service-center/pkg/gopool"
+ "github.com/apache/servicecomb-service-center/pkg/log"
)
-var publisher *Publisher
+var checker *HealthCheck
func init() {
- publisher = NewPublisher()
- publisher.Run()
+ checker = NewHealthCheck()
+ checker.Run()
}
-type Publisher struct {
+type HealthCheck struct {
wss []*WebSocket
lock sync.Mutex
goroutine *gopool.Pool
}
-func (wh *Publisher) Run() {
- gopool.Go(publisher.loop)
+func (wh *HealthCheck) Run() {
+ gopool.Go(checker.loop)
}
-func (wh *Publisher) Stop() {
+func (wh *HealthCheck) Stop() {
wh.goroutine.Close(true)
}
-func (wh *Publisher) dispatch(ws *WebSocket, payload interface{}) {
- wh.goroutine.Do(func(ctx context.Context) {
- ws.HandleEvent(payload)
- })
-}
-
-func (wh *Publisher) loop(ctx context.Context) {
+func (wh *HealthCheck) loop(ctx context.Context) {
defer wh.Stop()
ticker := time.NewTicker(500 * time.Millisecond)
for {
@@ -61,49 +57,52 @@ func (wh *Publisher) loop(ctx context.Context) {
// server shutdown
return
case <-ticker.C:
- var removes []int
- for i, ws := range wh.wss {
- if payload := ws.Pick(); payload != nil {
- if _, ok := payload.(error); ok {
- removes = append(removes, i)
- }
- wh.dispatch(ws, payload)
+ for _, ws := range wh.wss {
+ if t := ws.NeedCheck(); t == nil {
+ continue
}
+ wh.check(ws)
}
- if len(removes) == 0 {
- continue
- }
-
- wh.lock.Lock()
- var (
- news []*WebSocket
- s int
- )
- for _, e := range removes {
- news = append(news, wh.wss[s:e]...)
- s = e + 1
- }
- if s < len(wh.wss) {
- news = append(news, wh.wss[s:]...)
- }
- wh.wss = news
- wh.lock.Unlock()
}
}
}
-func (wh *Publisher) Accept(ws *WebSocket) {
+func (wh *HealthCheck) check(ws *WebSocket) {
+ wh.goroutine.Do(func(ctx context.Context) {
+ if err := ws.CheckHealth(ctx); err != nil {
+ wh.Remove(ws)
+ log.Error(fmt.Sprintf("checker removed unhealth websocket[%s]", ws.RemoteAddr), err)
+ }
+ })
+}
+
+func (wh *HealthCheck) Accept(ws *WebSocket) int {
wh.lock.Lock()
wh.wss = append(wh.wss, ws)
+ n := len(wh.wss)
+ wh.lock.Unlock()
+ return n
+}
+
+func (wh *HealthCheck) Remove(ws *WebSocket) int {
+ wh.lock.Lock()
+ for i, t := range wh.wss {
+ if t == ws {
+ wh.wss = append(wh.wss[0:i], wh.wss[i+1:]...)
+ break
+ }
+ }
+ n := len(wh.wss)
wh.lock.Unlock()
+ return n
}
-func NewPublisher() *Publisher {
- return &Publisher{
+func NewHealthCheck() *HealthCheck {
+ return &HealthCheck{
goroutine: gopool.New(context.Background()),
}
}
-func Instance() *Publisher {
- return publisher
+func HealthChecker() *HealthCheck {
+ return checker
}
diff --git a/server/rest/controller/v3/instance_watcher.go b/server/connection/ws/health_check_test.go
similarity index 58%
copy from server/rest/controller/v3/instance_watcher.go
copy to server/connection/ws/health_check_test.go
index 7395c4c..7ab0578 100644
--- a/server/rest/controller/v3/instance_watcher.go
+++ b/server/connection/ws/health_check_test.go
@@ -14,20 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package v3
+
+package ws_test
import (
- "github.com/apache/servicecomb-service-center/pkg/rest"
- "github.com/apache/servicecomb-service-center/server/rest/controller/v4"
+ "github.com/apache/servicecomb-service-center/server/connection/ws"
+ "github.com/stretchr/testify/assert"
+ "testing"
)
-type WatchService struct {
- v4.WatchService
+func TestNewHealthCheck(t *testing.T) {
+ t.Run("should not return nil when new", func(t *testing.T) {
+ assert.NotNil(t, ws.NewHealthCheck())
+ })
}
-func (this *WatchService) URLPatterns() []rest.Route {
- return []rest.Route{
- {rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/watcher", this.Watch},
- {rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/listwatcher", this.ListAndWatch},
- }
+func TestHealthCheck_Run(t *testing.T) {
+ mock := NewTest()
+
+ t.Run("should return 1 when accept one ws", func(t *testing.T) {
+ check := ws.NewHealthCheck()
+ webSocket := ws.NewWebSocket("", "", mock.ServerConn)
+ assert.Equal(t, 1, check.Accept(webSocket))
+ assert.Equal(t, 0, check.Remove(webSocket))
+ })
}
diff --git a/server/rest/controller/v3/instance_watcher.go b/server/connection/ws/options.go
similarity index 64%
copy from server/rest/controller/v3/instance_watcher.go
copy to server/connection/ws/options.go
index 7395c4c..3196623 100644
--- a/server/rest/controller/v3/instance_watcher.go
+++ b/server/connection/ws/options.go
@@ -14,20 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package v3
+
+package ws
import (
- "github.com/apache/servicecomb-service-center/pkg/rest"
- "github.com/apache/servicecomb-service-center/server/rest/controller/v4"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/server/connection"
)
-type WatchService struct {
- v4.WatchService
+type Options struct {
+ ReadTimeout time.Duration
+ SendTimeout time.Duration
+ HealthInterval time.Duration
}
-func (this *WatchService) URLPatterns() []rest.Route {
- return []rest.Route{
- {rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/watcher", this.Watch},
- {rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/listwatcher", this.ListAndWatch},
+func ToOptions() Options {
+ return Options{
+ ReadTimeout: connection.ReadTimeout,
+ SendTimeout: connection.SendTimeout,
+ HealthInterval: connection.HeartbeatInterval,
}
}
diff --git a/server/connection/ws/websocket.go b/server/connection/ws/websocket.go
index 57b064c..e9fe3c1 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -19,311 +19,190 @@ package ws
import (
"context"
- "encoding/json"
"fmt"
+ "github.com/apache/servicecomb-service-center/pkg/util"
"time"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/connection"
- "github.com/apache/servicecomb-service-center/server/event"
- "github.com/apache/servicecomb-service-center/server/metrics"
pb "github.com/go-chassis/cari/discovery"
"github.com/gorilla/websocket"
)
const Websocket = "Websocket"
-type WebSocket struct {
- ctx context.Context
- ticker *time.Ticker
- conn *websocket.Conn
- // watcher subscribe the notification service event
- watcher *event.InstanceEventListWatcher
- needPingWatcher bool
- free chan struct{}
- closed chan struct{}
-}
-
-func (wh *WebSocket) Init() error {
- wh.ticker = time.NewTicker(connection.HeartbeatInterval)
- wh.needPingWatcher = true
- wh.free = make(chan struct{}, 1)
- wh.closed = make(chan struct{})
+var errServiceNotExist = fmt.Errorf("Service does not exist.")
- wh.SetReady()
-
- remoteAddr := wh.conn.RemoteAddr().String()
-
- // put in notification service queue
- if err := event.Center().AddSubscriber(wh.watcher); err != nil {
- err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s",
- remoteAddr, err.Error())
- log.Errorf(nil, err.Error())
-
- werr := wh.conn.WriteMessage(websocket.TextMessage, util.StringToBytesWithNoCopy(err.Error()))
- if werr != nil {
- log.Errorf(werr, "establish[%s] websocket watch failed: write message failed.", remoteAddr)
- }
- return err
- }
-
- // put in publisher queue
- Instance().Accept(wh)
+type WebSocket struct {
+ Options
+ Conn *websocket.Conn
+ RemoteAddr string
+ DomainProject string
+ ConsumerID string
- log.Debugf("start watching instance status, watcher[%s], subject: %s, group: %s",
- remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
- return nil
+ ticker *time.Ticker
+ needPing bool
+ idleCh chan struct{}
}
-func (wh *WebSocket) ReadTimeout() time.Duration {
- return connection.ReadTimeout
-}
+func (wh *WebSocket) Init() {
+ wh.RemoteAddr = wh.Conn.RemoteAddr().String()
+ wh.ticker = time.NewTicker(wh.HealthInterval)
+ wh.needPing = true
+ wh.idleCh = make(chan struct{}, 1)
-func (wh *WebSocket) SendTimeout() time.Duration {
- return connection.SendTimeout
-}
+ wh.registerMessageHandler()
-func (wh *WebSocket) Heartbeat(messageType int) error {
- err := wh.conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout()))
- if err != nil {
- messageTypeName := "Ping"
- if messageType == websocket.PongMessage {
- messageTypeName = "Pong"
- }
- log.Errorf(err, "fail to send '%s' to watcher[%s], subject: %s, group: %s",
- messageTypeName, wh.conn.RemoteAddr(), wh.watcher.Subject(), wh.watcher.Group())
- //wh.watcher.SetError(err)
- return err
- }
- return nil
+ wh.SetIdle()
+
+ log.Debugf("start watching instance status, subscriber[%s], consumer: %s",
+ wh.RemoteAddr, wh.ConsumerID)
}
-func (wh *WebSocket) HandleControlMessage() {
- remoteAddr := wh.conn.RemoteAddr().String()
+func (wh *WebSocket) registerMessageHandler() {
+ remoteAddr := wh.RemoteAddr
// PING
- wh.conn.SetPingHandler(func(message string) error {
+ wh.Conn.SetPingHandler(func(message string) error {
defer func() {
- err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+ err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
if err != nil {
log.Error("", err)
}
}()
- if wh.needPingWatcher {
- log.Infof("received 'Ping' message '%s' from watcher[%s], no longer send 'Ping' to it, subject: %s, group: %s",
- message, remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+ if wh.needPing {
+ log.Infof("received 'Ping' message '%s' from subscriber[%s], no longer send 'Ping' to it, consumer: %s",
+ message, remoteAddr, wh.ConsumerID)
}
- wh.needPingWatcher = false
- return wh.Heartbeat(websocket.PongMessage)
+ wh.needPing = false
+ return wh.WritePingPong(websocket.PongMessage)
})
// PONG
- wh.conn.SetPongHandler(func(message string) error {
+ wh.Conn.SetPongHandler(func(message string) error {
defer func() {
- err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+ err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
if err != nil {
log.Error("", err)
}
}()
- log.Debugf("received 'Pong' message '%s' from watcher[%s], subject: %s, group: %s",
- message, remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+ log.Debugf("received 'Pong' message '%s' from subscriber[%s], consumer: %s",
+ message, remoteAddr, wh.ConsumerID)
return nil
})
// CLOSE
- wh.conn.SetCloseHandler(func(code int, text string) error {
- log.Infof("watcher[%s] active closed, code: %d, message: '%s', subject: %s, group: %s",
- remoteAddr, code, text, wh.watcher.Subject(), wh.watcher.Group())
+ wh.Conn.SetCloseHandler(func(code int, text string) error {
+ log.Infof("subscriber[%s] active closed, code: %d, message: '%s', consumer: %s",
+ remoteAddr, code, text, wh.ConsumerID)
return wh.sendClose(code, text)
})
+}
- wh.conn.SetReadLimit(connection.ReadMaxBody)
- err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+func (wh *WebSocket) ReadMessage() error {
+ wh.Conn.SetReadLimit(connection.ReadMaxBody)
+ err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
if err != nil {
log.Error("", err)
}
for {
- _, _, err := wh.conn.ReadMessage()
+ _, _, err := wh.Conn.ReadMessage()
if err != nil {
- // client close or conn broken
- wh.watcher.SetError(err)
- return
+ return err
}
}
}
func (wh *WebSocket) sendClose(code int, text string) error {
- remoteAddr := wh.conn.RemoteAddr().String()
+ remoteAddr := wh.Conn.RemoteAddr().String()
var message []byte
if code != websocket.CloseNoStatusReceived {
message = websocket.FormatCloseMessage(code, text)
}
- err := wh.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(wh.SendTimeout()))
+ err := wh.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(wh.SendTimeout))
if err != nil {
- log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: %s",
- remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+ log.Errorf(err, "subscriber[%s] catch an err, consumer: %s",
+ remoteAddr, wh.ConsumerID)
return err
}
return nil
}
-// Pick will be called by publisher
-func (wh *WebSocket) Pick() interface{} {
+// NeedCheck will be called by checker
+func (wh *WebSocket) NeedCheck() interface{} {
select {
- case <-wh.Ready():
- if wh.watcher.Err() != nil {
- return wh.watcher.Err()
- }
-
+ case <-wh.Idle():
select {
case t := <-wh.ticker.C:
return t
- case j := <-wh.watcher.Job:
- if j == nil {
- return fmt.Errorf("server shutdown")
- }
- return j
default:
- // reset if idle
- wh.SetReady()
+ // reset if idleCh
+ wh.SetIdle()
}
default:
}
-
return nil
}
-// HandleEvent will be called if Pick() returns not nil
-func (wh *WebSocket) HandleEvent(o interface{}) {
- defer wh.SetReady()
-
- var (
- message []byte
- remoteAddr = wh.conn.RemoteAddr().String()
- )
-
- switch o := o.(type) {
- case error:
- log.Errorf(o, "watcher[%s] catch an err, subject: %s, group: %s",
- remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-
- message = util.StringToBytesWithNoCopy(fmt.Sprintf("watcher catch an err: %s", o.Error()))
- case time.Time:
- if exist, err := datasource.Instance().ExistServiceByID(wh.ctx, &pb.GetExistenceByIDRequest{
- ServiceId: wh.watcher.Group(),
- }); err != nil || !exist.Exist {
- message = util.StringToBytesWithNoCopy("Service does not exit.")
- break
- }
-
- if !wh.needPingWatcher {
- return
- }
+// CheckHealth will be called if NeedCheck() returns not nil
+func (wh *WebSocket) CheckHealth(ctx context.Context) error {
+ defer wh.SetIdle()
- if err := wh.Heartbeat(websocket.PingMessage); err != nil {
- log.Errorf(err, "send 'Ping' message to watcher[%s] failed, subject: %s, group: %s",
- remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
- return
- }
-
- log.Debugf("send 'Ping' message to watcher[%s], subject: %s, group: %s",
- remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
- return
- case *event.InstanceEvent:
- resp := o.Response
+ if !wh.needPing {
+ return nil
+ }
- providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
- if resp.Action != string(pb.EVT_EXPIRE) {
- providerFlag = fmt.Sprintf("%s/%s(%s)", resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
- }
- log.Infof("event[%s] is coming in, watcher[%s] watch %s, subject: %s, group: %s",
- resp.Action, remoteAddr, providerFlag, wh.watcher.Subject(), wh.watcher.Group())
+ ctx = util.SetDomainProjectString(ctx, wh.DomainProject)
- resp.Response = nil
- data, err := json.Marshal(resp)
- if err != nil {
- log.Errorf(err, "watcher[%s] watch %s, subject: %s, group: %s",
- remoteAddr, providerFlag, o, wh.watcher.Subject(), wh.watcher.Group())
- message = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", err.Error()))
- break
- }
- message = data
- default:
- log.Errorf(nil, "watcher[%s] unknown input %v, subject: %s, group: %s",
- remoteAddr, o, wh.watcher.Subject(), wh.watcher.Group())
- return
+ if exist, err := datasource.Instance().ExistServiceByID(ctx, &pb.GetExistenceByIDRequest{
+ ServiceId: wh.ConsumerID,
+ }); err != nil || !exist.Exist {
+ return errServiceNotExist
}
- select {
- case <-wh.closed:
- return
- default:
+ remoteAddr := wh.Conn.RemoteAddr().String()
+ if err := wh.WritePingPong(websocket.PingMessage); err != nil {
+ return err
}
- err := wh.WriteMessage(message)
- if evt, ok := o.(*event.InstanceEvent); ok {
- metrics.ReportPublishCompleted(evt, err)
- }
- if err != nil {
- log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: %s",
- remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
- }
+ log.Debugf("send 'Ping' message to subscriber[%s], consumer: %s",
+ remoteAddr, wh.ConsumerID)
+ return nil
+}
+
+func (wh *WebSocket) WritePingPong(messageType int) error {
+ return wh.Conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout))
}
-func (wh *WebSocket) WriteMessage(message []byte) error {
- err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout()))
+func (wh *WebSocket) WriteTextMessage(message []byte) error {
+ err := wh.Conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout))
if err != nil {
return err
}
- return wh.conn.WriteMessage(websocket.TextMessage, message)
+ err = wh.Conn.WriteMessage(websocket.TextMessage, message)
+ if err != nil {
+ log.Errorf(err, "subscriber[%s] catch an err, msg size: %d",
+ wh.Conn.RemoteAddr().String(), len(message))
+ }
+ return err
}
-func (wh *WebSocket) Ready() <-chan struct{} {
- return wh.free
+func (wh *WebSocket) Idle() <-chan struct{} {
+ return wh.idleCh
}
-func (wh *WebSocket) SetReady() {
+func (wh *WebSocket) SetIdle() {
select {
- case wh.free <- struct{}{}:
+ case wh.idleCh <- struct{}{}:
default:
}
}
-func (wh *WebSocket) Stop() {
- close(wh.closed)
-}
-
-func ListAndWatch(ctx context.Context, serviceID string, f func() ([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
- domainProject := util.ParseDomainProject(ctx)
- domain := util.ParseDomain(ctx)
- socket := New(ctx, conn, event.NewInstanceEventListWatcher(serviceID, domainProject, f))
-
- metrics.ReportSubscriber(domain, Websocket, 1)
- process(socket)
- metrics.ReportSubscriber(domain, Websocket, -1)
-}
-
-func process(socket *WebSocket) {
- if err := socket.Init(); err != nil {
- return
- }
-
- socket.HandleControlMessage()
-
- socket.Stop()
-}
-
-func SendEstablishError(conn *websocket.Conn, err error) {
- remoteAddr := conn.RemoteAddr().String()
- log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
- if err := conn.WriteMessage(websocket.TextMessage, util.StringToBytesWithNoCopy(err.Error())); err != nil {
- log.Errorf(err, "establish[%s] websocket watch failed: write message failed.", remoteAddr)
- }
-}
-
-func New(ctx context.Context, conn *websocket.Conn, watcher *event.InstanceEventListWatcher) *WebSocket {
- return &WebSocket{
- ctx: ctx,
- conn: conn,
- watcher: watcher,
+func NewWebSocket(domainProject, serviceID string, conn *websocket.Conn) *WebSocket {
+ ws := &WebSocket{
+ Options: ToOptions(),
+ DomainProject: domainProject,
+ ConsumerID: serviceID,
+ Conn: conn,
}
+ ws.Init()
+ return ws
}
diff --git a/server/connection/ws/websocket_test.go b/server/connection/ws/websocket_test.go
index 7f21727..ca40bd6 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -16,12 +16,10 @@
*/
package ws_test
-// initialize
import (
_ "github.com/apache/servicecomb-service-center/test"
"context"
- "errors"
"net/http"
"net/http/httptest"
"strings"
@@ -31,104 +29,136 @@ import (
wss "github.com/apache/servicecomb-service-center/server/connection/ws"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/event"
- "github.com/go-chassis/cari/discovery"
"github.com/gorilla/websocket"
+ "github.com/stretchr/testify/assert"
)
var closeCh = make(chan struct{})
-type watcherConn struct {
-}
-
func init() {
testing.Init()
core.Initialize()
}
+
+type watcherConn struct {
+ ClientConn *websocket.Conn
+ ServerConn *websocket.Conn
+}
+
+func (h *watcherConn) Test() {
+ s := httptest.NewServer(h)
+ h.ClientConn, _, _ = websocket.DefaultDialer.Dial(
+ strings.Replace(s.URL, "http://", "ws://", 1), nil)
+}
+
func (h *watcherConn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var upgrader = websocket.Upgrader{}
- conn, _ := upgrader.Upgrade(w, r, nil)
+ h.ServerConn, _ = upgrader.Upgrade(w, r, nil)
for {
- conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
- conn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
- _, _, err := conn.ReadMessage()
+ //h.ServerConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
+ //h.ServerConn.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
+ _, _, err := h.ServerConn.ReadMessage()
if err != nil {
return
}
<-closeCh
- conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
- conn.Close()
+ h.ServerConn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
+ h.ServerConn.Close()
return
}
}
-func TestDoWebSocketListAndWatch(t *testing.T) {
- s := httptest.NewServer(&watcherConn{})
-
- conn, _, _ := websocket.DefaultDialer.Dial(
- strings.Replace(s.URL, "http://", "ws://", 1), nil)
-
- wss.SendEstablishError(conn, errors.New("error"))
+func NewTest() *watcherConn {
+ ts := &watcherConn{}
+ ts.Test()
+ return ts
+}
- w := event.NewInstanceEventListWatcher("g", "s", func() (results []*discovery.WatchInstanceResponse, rev int64) {
- results = append(results, &discovery.WatchInstanceResponse{
- Response: discovery.CreateResponse(discovery.ResponseSuccess, "ok"),
- Action: string(discovery.EVT_CREATE),
- Key: &discovery.MicroServiceKey{},
- Instance: &discovery.MicroServiceInstance{},
- })
- return
+func TestNewWebSocket(t *testing.T) {
+ mock := NewTest()
+ t.Run("should return not nil when new", func(t *testing.T) {
+ assert.NotNil(t, wss.NewWebSocket("", "", mock.ServerConn))
})
+}
- ws := wss.New(context.Background(), conn, w)
- err := ws.Init()
- if err != nil {
- t.Fatalf("TestPublisher_Run")
+func TestWebSocket_NeedCheck(t *testing.T) {
+ mock := NewTest()
+ conn := mock.ServerConn
+ options := wss.ToOptions()
+ webSocket := &wss.WebSocket{
+ Options: options,
+ DomainProject: "default",
+ ConsumerID: "",
+ Conn: conn,
}
- event.Center().Start()
-
- go func() {
- wss.ListAndWatch(context.Background(), "", nil, conn)
-
- w2 := event.NewInstanceEventListWatcher("g", "s", func() (results []*discovery.WatchInstanceResponse, rev int64) {
- return
- })
- ws2 := wss.New(context.Background(), conn, w2)
- err := ws2.Init()
- if err != nil {
- t.Fatalf("TestPublisher_Run")
- }
- }()
-
- go ws.HandleControlMessage()
-
- w.OnMessage(nil)
- w.OnMessage(&event.InstanceEvent{})
-
- event.Center().Fire(event.NewInstanceEvent("g", "s", 1, &discovery.WatchInstanceResponse{
- Response: discovery.CreateResponse(discovery.ResponseSuccess, "ok"),
- Action: string(discovery.EVT_CREATE),
- Key: &discovery.MicroServiceKey{},
- Instance: &discovery.MicroServiceInstance{},
- }))
-
- <-time.After(time.Second)
-
- ws.HandleEvent(nil)
-
- ws.Heartbeat(websocket.PingMessage)
- ws.Heartbeat(websocket.PongMessage)
-
- ws.HandleEvent(time.Now())
+ t.Run("should not check when new", func(t *testing.T) {
+ webSocket.HealthInterval = time.Second
+ webSocket.Init()
+ assert.Nil(t, webSocket.NeedCheck())
+ })
- closeCh <- struct{}{}
+ t.Run("should check when check time up", func(t *testing.T) {
+ webSocket.HealthInterval = time.Microsecond
+ webSocket.Init()
+ <-time.After(time.Microsecond)
+ assert.NotNil(t, webSocket.NeedCheck())
+ })
+ t.Run("should not check when busy", func(t *testing.T) {
+ webSocket.HealthInterval = time.Microsecond
+ webSocket.Init()
+ <-time.After(time.Microsecond)
+ assert.NotNil(t, webSocket.NeedCheck())
+ assert.Nil(t, webSocket.NeedCheck())
+ })
+}
- <-time.After(time.Second)
+func TestWebSocket_Idle(t *testing.T) {
+ mock := NewTest()
+ webSocket := wss.NewWebSocket("", "", mock.ServerConn)
- ws.Heartbeat(websocket.PingMessage)
- ws.Heartbeat(websocket.PongMessage)
+ t.Run("should idle when new", func(t *testing.T) {
+ select {
+ case <-webSocket.Idle():
+ default:
+ assert.Fail(t, "not idle")
+ }
+ })
+ t.Run("should idle when setIdle", func(t *testing.T) {
+ select {
+ case <-webSocket.Idle():
+ assert.Fail(t, "idle")
+ default:
+ webSocket.SetIdle()
+ select {
+ case <-webSocket.Idle():
+ default:
+ assert.Fail(t, "not idle")
+ }
+ }
+ })
+ t.Run("should idle when checkHealth", func(t *testing.T) {
+ _ = webSocket.CheckHealth(context.Background())
+ select {
+ case <-webSocket.Idle():
+ default:
+ assert.Fail(t, "not idle")
+ }
+ })
+}
- w.OnMessage(nil)
+func TestWebSocket_CheckHealth(t *testing.T) {
+ mock := NewTest()
+ event.Center().Start()
- wss.Instance().Stop()
+ t.Run("should do nothing when recv PING", func(t *testing.T) {
+ ws := wss.NewWebSocket("", "", mock.ServerConn)
+ mock.ClientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
+ <-time.After(time.Second)
+ assert.Nil(t, ws.CheckHealth(context.Background()))
+ })
+ t.Run("should return err when consumer not exist", func(t *testing.T) {
+ ws := wss.NewWebSocket("", "", mock.ServerConn)
+ assert.Equal(t, "Service does not exist.", ws.CheckHealth(context.Background()).Error())
+ })
}
diff --git a/server/event/instance_event.go b/server/event/instance_event.go
new file mode 100644
index 0000000..74f30fc
--- /dev/null
+++ b/server/event/instance_event.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "github.com/apache/servicecomb-service-center/pkg/event"
+ simple "github.com/apache/servicecomb-service-center/pkg/time"
+ pb "github.com/go-chassis/cari/discovery"
+)
+
+const QueueSize = 5000
+
+var INSTANCE = event.RegisterType("INSTANCE", QueueSize)
+
+// 状态变化推送
+type InstanceEvent struct {
+ event.Event
+ Revision int64
+ Response *pb.WatchInstanceResponse
+}
+
+func NewInstanceEvent(serviceID, domainProject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent {
+ return &InstanceEvent{
+ Event: event.NewEvent(INSTANCE, domainProject, serviceID),
+ Revision: rev,
+ Response: response,
+ }
+}
+
+func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
+ return &InstanceEvent{
+ Event: event.NewEventWithTime(INSTANCE, domainProject, serviceID, createAt),
+ Revision: rev,
+ Response: response,
+ }
+}
diff --git a/server/event/instance_subscriber.go b/server/event/instance_subscriber.go
index f61f622..9d5c1c4 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -18,39 +18,20 @@
package event
import (
- "context"
- "time"
-
+ "fmt"
"github.com/apache/servicecomb-service-center/pkg/event"
- "github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
- simple "github.com/apache/servicecomb-service-center/pkg/time"
- pb "github.com/go-chassis/cari/discovery"
-)
-
-const (
- AddJobTimeout = 1 * time.Second
- EventQueueSize = 5000
+ "github.com/apache/servicecomb-service-center/server/metrics"
)
-var INSTANCE = event.RegisterType("INSTANCE", EventQueueSize)
-
-// 状态变化推送
-type InstanceEvent struct {
- event.Event
- Revision int64
- Response *pb.WatchInstanceResponse
-}
+var errBusy = fmt.Errorf("too busy")
-type InstanceEventListWatcher struct {
+type InstanceSubscriber struct {
event.Subscriber
- Job chan *InstanceEvent
- ListRevision int64
- ListFunc func() (results []*pb.WatchInstanceResponse, rev int64)
- listCh chan struct{}
+ Job chan *InstanceEvent
}
-func (w *InstanceEventListWatcher) SetError(err error) {
+func (w *InstanceSubscriber) SetError(err error) {
w.Subscriber.SetError(err)
// 触发清理job
e := w.Bus().Fire(event.NewUnhealthyEvent(w))
@@ -59,108 +40,64 @@ func (w *InstanceEventListWatcher) SetError(err error) {
}
}
-func (w *InstanceEventListWatcher) OnAccept() {
+func (w *InstanceSubscriber) OnAccept() {
if w.Err() != nil {
return
}
log.Debugf("accepted by event service, %s watcher %s %s", w.Type(), w.Group(), w.Subject())
- gopool.Go(w.listAndPublishJobs)
-}
-
-func (w *InstanceEventListWatcher) listAndPublishJobs(_ context.Context) {
- defer close(w.listCh)
- if w.ListFunc == nil {
- return
- }
- results, rev := w.ListFunc()
- w.ListRevision = rev
- for _, response := range results {
- w.sendMessage(NewInstanceEvent(w.Group(), w.Subject(), w.ListRevision, response))
- }
}
//被通知
-func (w *InstanceEventListWatcher) OnMessage(job event.Event) {
+func (w *InstanceSubscriber) OnMessage(evt event.Event) {
if w.Err() != nil {
return
}
- wJob, ok := job.(*InstanceEvent)
+ wJob, ok := evt.(*InstanceEvent)
if !ok {
return
}
-
- select {
- case <-w.listCh:
- default:
- timer := time.NewTimer(w.Timeout())
- select {
- case <-w.listCh:
- timer.Stop()
- case <-timer.C:
- log.Errorf(nil,
- "the %s listwatcher %s %s is not ready[over %s], send the event %v",
- w.Type(), w.Group(), w.Subject(), w.Timeout(), job)
- }
- }
-
- // the negative revision is specially for mongo scene,should be removed after mongo support revison.
- if wJob.Revision >= 0 && wJob.Revision <= w.ListRevision {
- log.Warnf("unexpected event %s job is coming in, watcher %s %s, job is %v, current revision is %v",
- w.Type(), w.Group(), w.Subject(), job, w.ListRevision)
- return
- }
w.sendMessage(wJob)
}
-func (w *InstanceEventListWatcher) sendMessage(job *InstanceEvent) {
+func (w *InstanceSubscriber) sendMessage(evt *InstanceEvent) {
defer log.Recover()
+
+ metrics.ReportPendingCompleted(evt)
+
select {
- case w.Job <- job:
+ case w.Job <- evt:
default:
- timer := time.NewTimer(w.Timeout())
- select {
- case w.Job <- job:
- timer.Stop()
- case <-timer.C:
- log.Errorf(nil,
- "the %s watcher %s %s event queue is full[over %s], drop the event %v",
- w.Type(), w.Group(), w.Subject(), w.Timeout(), job)
- }
+ log.Errorf(nil, "the %s watcher %s %s event queue is full, drop the blocked events",
+ w.Type(), w.Group(), w.Subject())
+ w.cleanup()
+ w.Job <- evt
}
}
-func (w *InstanceEventListWatcher) Timeout() time.Duration {
- return AddJobTimeout
-}
-
-func (w *InstanceEventListWatcher) Close() {
- close(w.Job)
-}
-
-func NewInstanceEvent(serviceID, domainProject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent {
- return &InstanceEvent{
- Event: event.NewEvent(INSTANCE, domainProject, serviceID),
- Revision: rev,
- Response: response,
+func (w *InstanceSubscriber) cleanup() {
+ for {
+ select {
+ case evt, ok := <-w.Job:
+ if !ok {
+ return
+ }
+ metrics.ReportPublishCompleted(evt, errBusy)
+ default:
+ return
+ }
}
}
-func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
- return &InstanceEvent{
- Event: event.NewEventWithTime(INSTANCE, domainProject, serviceID, createAt),
- Revision: rev,
- Response: response,
- }
+func (w *InstanceSubscriber) Close() {
+ w.cleanup()
+ close(w.Job)
}
-func NewInstanceEventListWatcher(serviceID, domainProject string,
- listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *InstanceEventListWatcher {
- watcher := &InstanceEventListWatcher{
+func NewInstanceSubscriber(serviceID, domainProject string) *InstanceSubscriber {
+ watcher := &InstanceSubscriber{
Subscriber: event.NewSubscriber(INSTANCE, domainProject, serviceID),
Job: make(chan *InstanceEvent, INSTANCE.QueueSize()),
- ListFunc: listFunc,
- listCh: make(chan struct{}),
}
return watcher
}
diff --git a/server/metrics/connection.go b/server/metrics/connection.go
index aea7c97..c012980 100644
--- a/server/metrics/connection.go
+++ b/server/metrics/connection.go
@@ -44,6 +44,23 @@ var (
Objectives: metrics.Pxx,
}, []string{"instance", "source", "status"})
+ pendingGauge = helper.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: metrics.FamilyName,
+ Subsystem: "notify",
+ Name: "pending_total",
+ Help: "Counter of pending instance events",
+ }, []string{"instance", "source"})
+
+ pendingLatency = helper.NewSummaryVec(
+ prometheus.SummaryOpts{
+ Namespace: metrics.FamilyName,
+ Subsystem: "notify",
+ Name: "pending_durations_microseconds",
+ Help: "Latency of pending instance events",
+ Objectives: metrics.Pxx,
+ }, []string{"instance", "source"})
+
subscriberGauge = helper.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metrics.FamilyName,
@@ -62,10 +79,17 @@ func ReportPublishCompleted(evt event.Event, err error) {
}
notifyLatency.WithLabelValues(instance, evt.Type().String(), status).Observe(elapsed)
notifyCounter.WithLabelValues(instance, evt.Type().String(), status).Inc()
+ pendingGauge.WithLabelValues(instance, evt.Type().String()).Dec()
}
-func ReportSubscriber(domain, scheme string, n float64) {
+func ReportPendingCompleted(evt event.Event) {
instance := metrics.InstanceName()
+ elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) / float64(time.Microsecond)
+ pendingLatency.WithLabelValues(instance, evt.Type().String()).Observe(elapsed)
+ pendingGauge.WithLabelValues(instance, evt.Type().String()).Inc()
+}
+func ReportSubscriber(domain, scheme string, n float64) {
+ instance := metrics.InstanceName()
subscriberGauge.WithLabelValues(instance, domain, scheme).Add(n)
}
diff --git a/server/rest/controller/v3/instance_watcher.go b/server/rest/controller/v3/instance_watcher.go
index 7395c4c..6d0690b 100644
--- a/server/rest/controller/v3/instance_watcher.go
+++ b/server/rest/controller/v3/instance_watcher.go
@@ -28,6 +28,5 @@ type WatchService struct {
func (this *WatchService) URLPatterns() []rest.Route {
return []rest.Route{
{rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/watcher", this.Watch},
- {rest.HTTPMethodGet, "/registry/v3/microservices/:serviceId/listwatcher", this.ListAndWatch},
}
}
diff --git a/server/rest/controller/v4/instance_watcher.go b/server/rest/controller/v4/instance_watcher.go
index e6de6f3..9396ee5 100644
--- a/server/rest/controller/v4/instance_watcher.go
+++ b/server/rest/controller/v4/instance_watcher.go
@@ -34,7 +34,6 @@ type WatchService struct {
func (s *WatchService) URLPatterns() []rest.Route {
return []rest.Route{
{Method: rest.HTTPMethodGet, Path: "/v4/:project/registry/microservices/:serviceId/watcher", Func: s.Watch},
- {Method: rest.HTTPMethodGet, Path: "/v4/:project/registry/microservices/:serviceId/listwatcher", Func: s.ListAndWatch},
}
}
@@ -63,16 +62,3 @@ func (s *WatchService) Watch(w http.ResponseWriter, r *http.Request) {
SelfServiceId: r.URL.Query().Get(":serviceId"),
}, conn)
}
-
-func (s *WatchService) ListAndWatch(w http.ResponseWriter, r *http.Request) {
- conn, err := upgrade(w, r)
- if err != nil {
- return
- }
- defer conn.Close()
-
- r.Method = "WATCHLIST"
- core.InstanceAPI.WebSocketListAndWatch(r.Context(), &pb.WatchInstanceRequest{
- SelfServiceId: r.URL.Query().Get(":serviceId"),
- }, conn)
-}
diff --git a/server/service/watch.go b/server/service/watch.go
index b3c00df..8813b0f 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -54,7 +54,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream proto.Servic
return err
}
- return grpc.ListAndWatch(stream.Context(), in.SelfServiceId, nil, stream)
+ return grpc.Watch(stream.Context(), in.SelfServiceId, stream)
}
func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
@@ -63,18 +63,7 @@ func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstan
ws.SendEstablishError(conn, err)
return
}
- ws.ListAndWatch(ctx, in.SelfServiceId, nil, conn)
-}
-
-func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) {
- log.Infof("new a web socket list and watch with service[%s]", in.SelfServiceId)
- if err := s.WatchPreOpera(ctx, in); err != nil {
- ws.SendEstablishError(conn, err)
- return
- }
- ws.ListAndWatch(ctx, in.SelfServiceId, func() ([]*pb.WatchInstanceResponse, int64) {
- return s.QueryAllProvidersInstances(ctx, in)
- }, conn)
+ ws.Watch(ctx, in.SelfServiceId, conn)
}
func (s *InstanceService) QueryAllProvidersInstances(ctx context.Context, in *pb.WatchInstanceRequest) ([]*pb.WatchInstanceResponse, int64) {
diff --git a/server/service/watch_test.go b/server/service/watch_test.go
index b663cc8..a058399 100644
--- a/server/service/watch_test.go
+++ b/server/service/watch_test.go
@@ -46,13 +46,6 @@ func TestInstanceService_WebSocketWatch(t *testing.T) {
instanceResource.WebSocketWatch(context.Background(), &pb.WatchInstanceRequest{}, nil)
}
-func TestInstanceService_WebSocketListAndWatch(t *testing.T) {
- defer func() {
- recover()
- }()
- instanceResource.WebSocketListAndWatch(context.Background(), &pb.WatchInstanceRequest{}, nil)
-}
-
var _ = Describe("'Instance' service", func() {
Describe("execute 'watch' operartion", func() {
var (