You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2019/02/14 03:02:34 UTC
[servicecomb-service-center] branch master updated: SCB-1092 Output
event processing time (#533)
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new f120eb3 SCB-1092 Output event processing time (#533)
f120eb3 is described below
commit f120eb3fbed95559002eeba432a30473c061a4cd
Author: little-cui <su...@qq.com>
AuthorDate: Thu Feb 14 11:02:28 2019 +0800
SCB-1092 Output event processing time (#533)
* SCB-1092 Output event processing time
* SCB-1092 More abundant metrics information
---
integration/health-metrics-grafana.json | 330 ++++++++++++++++++++++---
pkg/buffer/reader.go | 38 +++
pkg/buffer/reader_test.go | 51 ++++
pkg/client/sc/client_lb.go | 21 +-
pkg/client/sc/client_lb_test.go | 56 +++++
pkg/notify/notice.go | 8 +-
pkg/notify/notice_test.go | 33 +++
pkg/rest/client.go | 54 ++--
server/notify/listwatcher.go | 17 +-
server/notify/metrics.go | 9 +-
server/notify/stream.go | 4 +-
server/notify/stream_test.go | 25 +-
server/notify/websocket.go | 5 +-
server/plugin/pkg/discovery/cacher.go | 2 +-
server/plugin/pkg/discovery/etcd/cacher_kv.go | 23 +-
server/plugin/pkg/discovery/metrics.go | 20 +-
server/plugin/pkg/discovery/types.go | 7 +
server/service/event/instance_event_handler.go | 11 +-
server/service/event/rule_event_handler.go | 16 +-
server/service/event/tag_event_handler.go | 20 +-
20 files changed, 623 insertions(+), 127 deletions(-)
diff --git a/integration/health-metrics-grafana.json b/integration/health-metrics-grafana.json
index 005c2bb..e7acfb7 100644
--- a/integration/health-metrics-grafana.json
+++ b/integration/health-metrics-grafana.json
@@ -1157,7 +1157,7 @@
"fill": 1,
"gridPos": {
"h": 6,
- "w": 8,
+ "w": 6,
"x": 12,
"y": 13
},
@@ -1239,6 +1239,99 @@
}
},
{
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_LOCAL}",
+ "fill": 1,
+ "gridPos": {
+ "h": 6,
+ "w": 6,
+ "x": 18,
+ "y": 13
+ },
+ "height": "",
+ "id": 42,
+ "legend": {
+ "alignAsTable": true,
+ "avg": true,
+ "current": false,
+ "hideEmpty": true,
+ "hideZero": true,
+ "max": true,
+ "min": true,
+ "rightSide": false,
+ "show": true,
+ "sort": "avg",
+ "sortDesc": true,
+ "total": false,
+ "values": true
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "minSpan": 4,
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "max(avg_over_time(service_center_notify_publish_durations_microseconds{job=\"service-center\"}[1m])) by (source)",
+ "format": "time_series",
+ "intervalFactor": 1,
+ "legendFormat": "{{source}}",
+ "refId": "B"
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Events Latency",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "transparent": false,
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "µs",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
"cacheTimeout": null,
"colorBackground": false,
"colorValue": true,
@@ -2030,25 +2123,205 @@
"steppedLine": false,
"targets": [
{
- "expr": "sum(service_center_notify_publish_total{job=\"service-center\"}) by (instance,source)",
+ "expr": "sum(irate(service_center_db_backend_event_total{job=\"service-center\"}[1m])) by (instance,prefix)",
"format": "time_series",
- "instant": false,
"intervalFactor": 1,
- "legendFormat": "{{instance}}> {{source}}",
+ "legendFormat": "{{instance}}> {{prefix}}",
+ "refId": "B"
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Publish Events",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "transparent": false,
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "decimals": 0,
+ "format": "ops",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_LOCAL}",
+ "fill": 1,
+ "gridPos": {
+ "h": 6,
+ "w": 6,
+ "x": 6,
+ "y": 34
+ },
+ "height": "",
+ "id": 41,
+ "legend": {
+ "alignAsTable": false,
+ "avg": true,
+ "current": false,
+ "hideEmpty": true,
+ "hideZero": true,
+ "max": true,
+ "min": true,
+ "rightSide": true,
+ "show": false,
+ "sort": "avg",
+ "sortDesc": true,
+ "total": false,
+ "values": true
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "minSpan": 4,
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "max(avg_over_time(service_center_db_backend_event_durations_microseconds{job=\"service-center\"}[1m])) by (instance, prefix)",
+ "format": "time_series",
+ "intervalFactor": 1,
+ "legendFormat": "{{instance}}> {{prefix}}",
"refId": "A"
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Publish Events Latency",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "transparent": false,
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "µs",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
},
{
- "expr": "sum(service_center_db_backend_event_total{job=\"service-center\"}) by (instance)",
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_LOCAL}",
+ "fill": 1,
+ "gridPos": {
+ "h": 6,
+ "w": 6,
+ "x": 12,
+ "y": 34
+ },
+ "height": "",
+ "id": 40,
+ "legend": {
+ "alignAsTable": false,
+ "avg": false,
+ "current": false,
+ "hideEmpty": true,
+ "hideZero": true,
+ "max": false,
+ "min": false,
+ "rightSide": false,
+ "show": false,
+ "sort": "avg",
+ "sortDesc": true,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "minSpan": 4,
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "sum(irate(service_center_notify_publish_total{job=\"service-center\"}[1m])) by (instance,source)",
"format": "time_series",
+ "instant": false,
"intervalFactor": 1,
- "legendFormat": "{{instance}}> BACKEND",
- "refId": "B"
+ "legendFormat": "{{instance}}> {{source}}",
+ "refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Events Total",
+ "title": "Subscribe Events",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2066,7 +2339,7 @@
"yaxes": [
{
"decimals": 0,
- "format": "none",
+ "format": "ops",
"label": null,
"logBase": 1,
"max": null,
@@ -2097,7 +2370,7 @@
"gridPos": {
"h": 6,
"w": 6,
- "x": 6,
+ "x": 18,
"y": 34
},
"height": "",
@@ -2137,19 +2410,12 @@
"intervalFactor": 1,
"legendFormat": "{{instance}}> {{source}}",
"refId": "B"
- },
- {
- "expr": "max(avg_over_time(service_center_db_backend_event_durations_microseconds{job=\"service-center\"}[1m])) by (instance)",
- "format": "time_series",
- "intervalFactor": 1,
- "legendFormat": "{{instance}}> BACKEND",
- "refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
- "title": "Events Latency",
+ "title": "Subscribe Events Latency",
"tooltip": {
"shared": true,
"sort": 0,
@@ -2197,8 +2463,8 @@
"gridPos": {
"h": 6,
"w": 6,
- "x": 12,
- "y": 34
+ "x": 0,
+ "y": 40
},
"height": "",
"id": 37,
@@ -2292,8 +2558,8 @@
"gridPos": {
"h": 6,
"w": 6,
- "x": 18,
- "y": 34
+ "x": 6,
+ "y": 40
},
"height": "",
"id": 38,
@@ -2381,7 +2647,7 @@
"h": 3,
"w": 24,
"x": 0,
- "y": 40
+ "y": 46
},
"height": "1px",
"id": 4,
@@ -2402,7 +2668,7 @@
"h": 7,
"w": 8,
"x": 0,
- "y": 43
+ "y": 49
},
"height": "",
"id": 9,
@@ -2490,7 +2756,7 @@
"h": 7,
"w": 8,
"x": 8,
- "y": 43
+ "y": 49
},
"id": 14,
"legend": {
@@ -2575,7 +2841,7 @@
"h": 7,
"w": 8,
"x": 16,
- "y": 43
+ "y": 49
},
"id": 5,
"legend": {
@@ -2660,7 +2926,7 @@
"h": 7,
"w": 8,
"x": 0,
- "y": 50
+ "y": 56
},
"id": 6,
"legend": {
@@ -2745,7 +3011,7 @@
"h": 7,
"w": 8,
"x": 8,
- "y": 50
+ "y": 56
},
"height": "",
"id": 2,
@@ -2834,7 +3100,7 @@
"h": 7,
"w": 8,
"x": 16,
- "y": 50
+ "y": 56
},
"height": "",
"id": 8,
@@ -2921,7 +3187,7 @@
"h": 7,
"w": 24,
"x": 0,
- "y": 57
+ "y": 63
},
"id": 7,
"legend": {
@@ -3014,7 +3280,7 @@
"list": []
},
"time": {
- "from": "now-1h",
+ "from": "now-5m",
"to": "now"
},
"timepicker": {
@@ -3045,5 +3311,5 @@
"timezone": "",
"title": "ServiceCenter",
"uid": "Zg6NoHGiz",
- "version": 7
+ "version": 9
}
\ No newline at end of file
diff --git a/pkg/buffer/reader.go b/pkg/buffer/reader.go
new file mode 100644
index 0000000..06a0c23
--- /dev/null
+++ b/pkg/buffer/reader.go
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buffer
+
+import (
+ "bytes"
+ "io"
+ "strings"
+)
+
+func ReadLine(buf *bytes.Buffer, cb func(line string) bool) error {
+ for {
+ s, err := buf.ReadString('\n')
+ if err != nil && err != io.EOF {
+ return err
+ }
+ if !cb(strings.TrimSpace(s)) {
+ break
+ }
+ if err != nil {
+ break
+ }
+ }
+ return nil
+}
diff --git a/pkg/buffer/reader_test.go b/pkg/buffer/reader_test.go
new file mode 100644
index 0000000..36caf39
--- /dev/null
+++ b/pkg/buffer/reader_test.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buffer
+
+import (
+ "bytes"
+ "testing"
+)
+
+func TestReadLine(t *testing.T) {
+ buf := bytes.NewBuffer([]byte("a\nb\r\nc\n "))
+ err := ReadLine(buf, func(line string) bool {
+ switch line {
+ case "a", "b", "c", "":
+ default:
+ t.Fatal("TestReadLine")
+ }
+ return true
+ })
+ if err != nil {
+ t.Fatal("TestReadLine", err)
+ }
+
+ buf = bytes.NewBuffer([]byte("a\nb"))
+ err = ReadLine(buf, func(line string) bool {
+ switch line {
+ case "a":
+ case "b":
+ t.Fatal("TestReadLine")
+ default:
+ t.Fatal("TestReadLine")
+ }
+ return false
+ })
+ if err != nil {
+ t.Fatal("TestReadLine", err)
+ }
+}
diff --git a/pkg/client/sc/client_lb.go b/pkg/client/sc/client_lb.go
index 55818b1..8b99b01 100644
--- a/pkg/client/sc/client_lb.go
+++ b/pkg/client/sc/client_lb.go
@@ -16,9 +16,11 @@
package sc
import (
- "github.com/apache/servicecomb-service-center/pkg/backoff"
+ "errors"
+ "fmt"
"github.com/apache/servicecomb-service-center/pkg/lb"
"github.com/apache/servicecomb-service-center/pkg/rest"
+ "github.com/apache/servicecomb-service-center/pkg/util"
"golang.org/x/net/context"
"net/http"
)
@@ -46,9 +48,18 @@ func (c *LBClient) Next() string {
}
func (c *LBClient) RestDoWithContext(ctx context.Context, method string, api string, headers http.Header, body []byte) (resp *http.Response, err error) {
- err = backoff.DelayIn(c.Retries, func() (rerr error) {
- resp, rerr = c.HttpDoWithContext(ctx, method, c.Next()+api, headers, body)
- return
- })
+ var errs []string
+ for i := 0; i < c.Retries; i++ {
+ addr := c.Next()
+ resp, err = c.HttpDoWithContext(ctx, method, addr+api, headers, body)
+ if err != nil {
+ errs = append(errs, fmt.Sprintf("[%s]: %s", addr, err.Error()))
+ continue
+ }
+ break
+ }
+ if err != nil {
+ err = errors.New(util.StringJoin(errs, ", "))
+ }
return
}
diff --git a/pkg/client/sc/client_lb_test.go b/pkg/client/sc/client_lb_test.go
new file mode 100644
index 0000000..0dcf159
--- /dev/null
+++ b/pkg/client/sc/client_lb_test.go
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sc
+
+import (
+ "fmt"
+ "github.com/apache/servicecomb-service-center/pkg/rest"
+ "golang.org/x/net/context"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "testing"
+)
+
+func TestNewLBClient(t *testing.T) {
+ os.Setenv("DEBUG_MODE", "1")
+ client, err := NewLBClient([]string{"x.x.x.x", "rest://2.2.2.2"}, rest.DefaultURLClientOption())
+ if err != nil {
+ t.Fatal("TestNewLBClient", err)
+ }
+ _, err = client.RestDoWithContext(context.Background(), "yyy", "/zzz", http.Header{"test": []string{"a"}}, []byte(`abcdef`))
+ if err == nil {
+ t.Fatal("TestNewLBClient")
+ }
+ fmt.Println(err)
+
+ svc := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ b, _ := ioutil.ReadAll(req.Body)
+ w.Write(b)
+ }))
+ defer svc.Close()
+
+ client, err = NewLBClient([]string{"x.x.x.x", svc.URL}, rest.DefaultURLClientOption())
+ if err != nil {
+ t.Fatal("TestNewLBClient", err)
+ }
+ _, err = client.RestDoWithContext(context.Background(), http.MethodGet, "", http.Header{"test": []string{"a"}}, []byte(`abcdef`))
+ if err != nil {
+ t.Fatal("TestNewLBClient", err)
+ }
+}
diff --git a/pkg/notify/notice.go b/pkg/notify/notice.go
index e5ac265..cc3b272 100644
--- a/pkg/notify/notice.go
+++ b/pkg/notify/notice.go
@@ -51,6 +51,10 @@ func (s *baseEvent) CreateAt() time.Time {
return s.createAt.Local()
}
-func NewEvent(t Type, s string, g string) Event {
- return &baseEvent{t, s, g, simple.FromTime(time.Now())}
+func NewEvent(t Type, s, g string) Event {
+ return NewEventWithTime(t, s, g, simple.FromTime(time.Now()))
+}
+
+func NewEventWithTime(t Type, s, g string, now simple.Time) Event {
+ return &baseEvent{t, s, g, now}
}
diff --git a/pkg/notify/notice_test.go b/pkg/notify/notice_test.go
new file mode 100644
index 0000000..5ad78d2
--- /dev/null
+++ b/pkg/notify/notice_test.go
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package notify
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestNewEventWithTime(t *testing.T) {
+ evt := NewEvent(NOTIFTY, "a", "b")
+ if evt.CreateAt().UnixNano() == 0 {
+ t.Fatal("TestNewEventWithTime")
+ }
+ fmt.Println(evt.CreateAt())
+
+ if evt.Type() != NOTIFTY || evt.Subject() != "a" || evt.Group() != "b" {
+ t.Fatal("TestNewEventWithTime")
+ }
+}
diff --git a/pkg/rest/client.go b/pkg/rest/client.go
index 8250637..6eb5843 100644
--- a/pkg/rest/client.go
+++ b/pkg/rest/client.go
@@ -21,12 +21,14 @@ import (
"crypto/tls"
"errors"
"fmt"
+ "github.com/apache/servicecomb-service-center/pkg/buffer"
"github.com/apache/servicecomb-service-center/pkg/tlsutil"
"github.com/apache/servicecomb-service-center/pkg/util"
"golang.org/x/net/context"
"io"
"io/ioutil"
"net/http"
+ "net/http/httputil"
"net/url"
"os"
"strings"
@@ -118,10 +120,15 @@ func (client *URLClient) HttpDoWithContext(ctx context.Context, method string, r
req = req.WithContext(ctx)
req.Header = headers
+ DumpRequestOut(req)
+
resp, err = client.Client.Do(req)
if err != nil {
return nil, err
}
+
+ DumpResponse(resp)
+
switch resp.Header.Get(HEADER_CONTENT_ENCODING) {
case "gzip":
reader, err := NewGZipBodyReader(resp.Body)
@@ -133,32 +140,41 @@ func (client *URLClient) HttpDoWithContext(ctx context.Context, method string, r
resp.Body = reader
}
- if os.Getenv("DEBUG_MODE") == "1" {
- fmt.Println("--- BEGIN ---")
- fmt.Printf("> %s %s %s\n", req.Method, req.URL.RequestURI(), req.Proto)
- for key, header := range req.Header {
- for _, value := range header {
- fmt.Printf("> %s: %s\n", key, value)
- }
- }
- fmt.Println(">")
- fmt.Println(util.BytesToStringWithNoCopy(body))
- fmt.Printf("< %s %s\n", resp.Proto, resp.Status)
- for key, header := range resp.Header {
- for _, value := range header {
- fmt.Printf("< %s: %s\n", key, value)
- }
- }
- fmt.Println("<")
- fmt.Println("--- END ---")
- }
return resp, nil
}
+func DumpRequestOut(req *http.Request) {
+ if req == nil || !util.StringTRUE(os.Getenv("DEBUG_MODE")) {
+ return
+ }
+
+ b, _ := httputil.DumpRequestOut(req, true)
+ buffer.ReadLine(bytes.NewBuffer(b), func(line string) bool {
+ fmt.Println(">", line)
+ return true
+ })
+}
+
+func DumpResponse(resp *http.Response) {
+ if resp == nil || !util.StringTRUE(os.Getenv("DEBUG_MODE")) {
+ return
+ }
+
+ b, _ := httputil.DumpResponse(resp, true)
+ buffer.ReadLine(bytes.NewBuffer(b), func(line string) bool {
+ fmt.Println("<", line)
+ return true
+ })
+}
+
func (client *URLClient) HttpDo(method string, rawURL string, headers http.Header, body []byte) (resp *http.Response, err error) {
return client.HttpDoWithContext(context.Background(), method, rawURL, headers, body)
}
+func DefaultURLClientOption() URLClientOption {
+ return defaultURLClientOption
+}
+
func setOptionDefaultValue(o *URLClientOption) URLClientOption {
if o == nil {
return defaultURLClientOption
diff --git a/server/notify/listwatcher.go b/server/notify/listwatcher.go
index 0b831d7..b5c5394 100644
--- a/server/notify/listwatcher.go
+++ b/server/notify/listwatcher.go
@@ -20,6 +20,7 @@ import (
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/notify"
+ simple "github.com/apache/servicecomb-service-center/pkg/time"
pb "github.com/apache/servicecomb-service-center/server/core/proto"
"golang.org/x/net/context"
"time"
@@ -124,18 +125,26 @@ func (w *InstanceEventListWatcher) Close() {
close(w.Job)
}
-func NewInstanceEvent(group, subject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent {
+func NewInstanceEvent(serviceId, domainProject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent {
return &InstanceEvent{
- Event: notify.NewEvent(INSTANCE, subject, group),
+ Event: notify.NewEvent(INSTANCE, domainProject, serviceId),
Revision: rev,
Response: response,
}
}
-func NewInstanceEventListWatcher(group string, subject string,
+func NewInstanceEventWithTime(serviceId, domainProject string, rev int64, createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
+ return &InstanceEvent{
+ Event: notify.NewEventWithTime(INSTANCE, domainProject, serviceId, createAt),
+ Revision: rev,
+ Response: response,
+ }
+}
+
+func NewInstanceEventListWatcher(serviceId, domainProject string,
listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *InstanceEventListWatcher {
watcher := &InstanceEventListWatcher{
- Subscriber: notify.NewSubscriber(INSTANCE, subject, group),
+ Subscriber: notify.NewSubscriber(INSTANCE, domainProject, serviceId),
Job: make(chan *InstanceEvent, INSTANCE.QueueSize()),
ListFunc: listFunc,
listCh: make(chan struct{}),
diff --git a/server/notify/metrics.go b/server/notify/metrics.go
index eac4c52..94fa7d9 100644
--- a/server/notify/metrics.go
+++ b/server/notify/metrics.go
@@ -16,6 +16,7 @@
package notify
import (
+ "github.com/apache/servicecomb-service-center/pkg/notify"
"github.com/apache/servicecomb-service-center/server/metric"
"github.com/prometheus/client_golang/prometheus"
"time"
@@ -49,13 +50,13 @@ func init() {
prometheus.MustRegister(notifyCounter, notifyLatency)
}
-func ReportPublishCompleted(source string, err error, start time.Time) {
+func ReportPublishCompleted(evt notify.Event, err error) {
instance := metric.InstanceName()
- elapsed := float64(time.Since(start).Nanoseconds()) / float64(time.Microsecond)
+ elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) / float64(time.Microsecond)
status := success
if err != nil {
status = failure
}
- notifyLatency.WithLabelValues(instance, source, status).Observe(elapsed)
- notifyCounter.WithLabelValues(instance, source, status).Inc()
+ notifyLatency.WithLabelValues(instance, evt.Type().String(), status).Observe(elapsed)
+ notifyCounter.WithLabelValues(instance, evt.Type().String(), status).Inc()
}
diff --git a/server/notify/stream.go b/server/notify/stream.go
index 0ac6403..3fe7371 100644
--- a/server/notify/stream.go
+++ b/server/notify/stream.go
@@ -49,7 +49,9 @@ func HandleWatchJob(watcher *InstanceEventListWatcher, stream pb.ServiceInstance
watcher.Subject(), watcher.Group())
err = stream.Send(resp)
- ReportPublishCompleted(INSTANCE.String(), err, job.CreateAt())
+ if job != nil {
+ ReportPublishCompleted(job, err)
+ }
if err != nil {
log.Errorf(err, "send message error, subject: %s, group: %s",
watcher.Subject(), watcher.Group())
diff --git a/server/notify/stream_test.go b/server/notify/stream_test.go
index 60bd8d9..3280bf6 100644
--- a/server/notify/stream_test.go
+++ b/server/notify/stream_test.go
@@ -18,21 +18,36 @@ package notify
import (
"github.com/apache/servicecomb-service-center/pkg/log"
+ simple "github.com/apache/servicecomb-service-center/pkg/time"
+ pb "github.com/apache/servicecomb-service-center/server/core/proto"
"golang.org/x/net/context"
+ "google.golang.org/grpc"
"testing"
+ "time"
)
+type grpcWatchServer struct {
+ grpc.ServerStream
+}
+
+func (x *grpcWatchServer) Send(m *pb.WatchInstanceResponse) error {
+ return nil
+}
+
+func (x *grpcWatchServer) Context() context.Context {
+ return context.Background()
+}
+
func TestHandleWatchJob(t *testing.T) {
- defer log.Recover()
w := NewInstanceEventListWatcher("g", "s", nil)
w.Job <- nil
- err := HandleWatchJob(w, nil)
+ err := HandleWatchJob(w, &grpcWatchServer{})
if err == nil {
t.Fatalf("TestHandleWatchJob failed")
}
- w.Job <- NewInstanceEvent("g", "s", 1, nil)
- err = HandleWatchJob(w, nil)
- t.Fatalf("TestHandleWatchJob failed")
+ w.Job <- NewInstanceEventWithTime("g", "s", 1, simple.FromTime(time.Now()), nil)
+ w.Job <- nil
+ HandleWatchJob(w, &grpcWatchServer{})
}
func TestDoStreamListAndWatch(t *testing.T) {
diff --git a/server/notify/websocket.go b/server/notify/websocket.go
index cc789e8..033ab12 100644
--- a/server/notify/websocket.go
+++ b/server/notify/websocket.go
@@ -21,7 +21,6 @@ import (
"fmt"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
- apt "github.com/apache/servicecomb-service-center/server/core"
pb "github.com/apache/servicecomb-service-center/server/core/proto"
serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
"github.com/gorilla/websocket"
@@ -239,7 +238,7 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) {
err := wh.WriteMessage(message)
if job != nil {
- ReportPublishCompleted(INSTANCE.String(), err, job.CreateAt())
+ ReportPublishCompleted(job, err)
}
if err != nil {
log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: %s",
@@ -272,7 +271,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([]
socket := &WebSocket{
ctx: ctx,
conn: conn,
- watcher: NewInstanceEventListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f),
+ watcher: NewInstanceEventListWatcher(serviceId, domainProject, f),
}
process(socket)
}
diff --git a/server/plugin/pkg/discovery/cacher.go b/server/plugin/pkg/discovery/cacher.go
index 086d928..9b19719 100644
--- a/server/plugin/pkg/discovery/cacher.go
+++ b/server/plugin/pkg/discovery/cacher.go
@@ -40,7 +40,7 @@ func (c *CommonCacher) Notify(action proto.EventType, key string, kv *KeyValue)
default:
c.cache.Put(key, kv)
}
- c.OnEvent(KvEvent{Type: action, KV: kv, Revision: kv.ModRevision})
+ c.OnEvent(NewKvEvent(action, kv, kv.ModRevision))
}
func (c *CommonCacher) OnEvent(evt KvEvent) {
diff --git a/server/plugin/pkg/discovery/etcd/cacher_kv.go b/server/plugin/pkg/discovery/etcd/cacher_kv.go
index 20596c1..323e1d8 100644
--- a/server/plugin/pkg/discovery/etcd/cacher_kv.go
+++ b/server/plugin/pkg/discovery/etcd/cacher_kv.go
@@ -132,7 +132,7 @@ func (c *KvCacher) handleWatcher(watcher Watcher) error {
rev := resp.Revision
evts := make([]discovery.KvEvent, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
- evt := discovery.KvEvent{Revision: kv.ModRevision}
+ evt := discovery.NewKvEvent(proto.EVT_CREATE, nil, kv.ModRevision)
switch {
case resp.Action == registry.Put && kv.Version == 1:
evt.Type, evt.KV = proto.EVT_CREATE, c.doParse(kv)
@@ -252,11 +252,7 @@ func (c *KvCacher) filterDelete(newStore map[string]*mvccpb.KeyValue,
i = 0
}
- block[i] = discovery.KvEvent{
- Revision: rev,
- Type: proto.EVT_DELETE,
- KV: v,
- }
+ block[i] = discovery.NewKvEvent(proto.EVT_DELETE, v, rev)
i++
return
})
@@ -283,11 +279,7 @@ func (c *KvCacher) filterCreateOrUpdate(newStore map[string]*mvccpb.KeyValue,
}
if kv := c.doParse(v); kv != nil {
- block[i] = discovery.KvEvent{
- Revision: rev,
- Type: proto.EVT_CREATE,
- KV: kv,
- }
+ block[i] = discovery.NewKvEvent(proto.EVT_CREATE, kv, rev)
i++
}
continue
@@ -304,11 +296,7 @@ func (c *KvCacher) filterCreateOrUpdate(newStore map[string]*mvccpb.KeyValue,
}
if kv := c.doParse(v); kv != nil {
- block[i] = discovery.KvEvent{
- Revision: rev,
- Type: proto.EVT_UPDATE,
- KV: kv,
- }
+ block[i] = discovery.NewKvEvent(proto.EVT_UPDATE, kv, rev)
i++
}
}
@@ -368,7 +356,6 @@ func (c *KvCacher) deferHandle(ctx context.Context) {
}
func (c *KvCacher) onEvents(evts []discovery.KvEvent) {
- start := time.Now()
init := !c.IsReady()
for i, evt := range evts {
key := util.BytesToStringWithNoCopy(evt.KV.Key)
@@ -406,7 +393,7 @@ func (c *KvCacher) onEvents(evts []discovery.KvEvent) {
c.notify(evts)
- discovery.ReportProcessEventCompleted(evts, start)
+ discovery.ReportProcessEventCompleted(c.Cfg.Key, evts)
}
func (c *KvCacher) notify(evts []discovery.KvEvent) {
diff --git a/server/plugin/pkg/discovery/metrics.go b/server/plugin/pkg/discovery/metrics.go
index 4e3737b..16681e8 100644
--- a/server/plugin/pkg/discovery/metrics.go
+++ b/server/plugin/pkg/discovery/metrics.go
@@ -22,11 +22,6 @@ import (
"time"
)
-const (
- success = "SUCCESS"
- failure = "FAILURE"
-)
-
var (
eventsCounter = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
@@ -34,7 +29,7 @@ var (
Subsystem: "db",
Name: "backend_event_total",
Help: "Counter of backend events",
- }, []string{"instance"})
+ }, []string{"instance", "prefix"})
eventsLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
@@ -43,20 +38,23 @@ var (
Name: "backend_event_durations_microseconds",
Help: "Latency of backend events processing",
Objectives: prometheus.DefObjectives,
- }, []string{"instance"})
+ }, []string{"instance", "prefix"})
)
func init() {
prometheus.MustRegister(eventsCounter, eventsLatency)
}
-func ReportProcessEventCompleted(evts []KvEvent, start time.Time) {
+func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
l := float64(len(evts))
if l == 0 {
return
}
instance := metric.InstanceName()
- elapsed := float64(time.Since(start).Nanoseconds()) / float64(time.Microsecond)
- eventsLatency.WithLabelValues(instance).Observe(elapsed / l)
- eventsCounter.WithLabelValues(instance).Add(l)
+ now := time.Now()
+ for _, evt := range evts {
+ elapsed := float64(now.Sub(evt.CreateAt.Local()).Nanoseconds()) / float64(time.Microsecond)
+ eventsLatency.WithLabelValues(instance, prefix).Observe(elapsed)
+ }
+ eventsCounter.WithLabelValues(instance, prefix).Add(l)
}
diff --git a/server/plugin/pkg/discovery/types.go b/server/plugin/pkg/discovery/types.go
index 1b759e7..d83856d 100644
--- a/server/plugin/pkg/discovery/types.go
+++ b/server/plugin/pkg/discovery/types.go
@@ -18,10 +18,12 @@ package discovery
import (
"encoding/json"
"fmt"
+ simple "github.com/apache/servicecomb-service-center/pkg/time"
"github.com/apache/servicecomb-service-center/pkg/util"
pb "github.com/apache/servicecomb-service-center/server/core/proto"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
"strconv"
+ "time"
)
var (
@@ -85,6 +87,7 @@ type KvEvent struct {
Revision int64
Type pb.EventType
KV *KeyValue
+ CreateAt simple.Time
}
type KvEventFunc func(evt KvEvent)
@@ -93,3 +96,7 @@ type KvEventHandler interface {
Type() Type
OnEvent(evt KvEvent)
}
+
+func NewKvEvent(action pb.EventType, kv *KeyValue, rev int64) KvEvent {
+ return KvEvent{Type: action, KV: kv, Revision: rev, CreateAt: simple.FromTime(time.Now())}
+}
diff --git a/server/service/event/instance_event_handler.go b/server/service/event/instance_event_handler.go
index e82ec04..e4b99cf 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -88,15 +88,14 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
return
}
- PublishInstanceEvent(domainProject, action, pb.MicroServiceToKey(domainProject, ms),
- instance, evt.Revision, consumerIds)
+ PublishInstanceEvent(evt, domainProject, pb.MicroServiceToKey(domainProject, ms), consumerIds)
}
func NewInstanceEventHandler() *InstanceEventHandler {
return &InstanceEventHandler{}
}
-func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, subscribers []string) {
+func PublishInstanceEvent(evt discovery.KvEvent, domainProject string, serviceKey *pb.MicroServiceKey, subscribers []string) {
defer cache.FindInstances.Remove(serviceKey)
if len(subscribers) == 0 {
@@ -105,13 +104,13 @@ func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey
response := &pb.WatchInstanceResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch instance successfully."),
- Action: string(action),
+ Action: string(evt.Type),
Key: serviceKey,
- Instance: instance,
+ Instance: evt.KV.Value.(*pb.MicroServiceInstance),
}
for _, consumerId := range subscribers {
// TODO add超时怎么处理?
- job := notify.NewInstanceEvent(consumerId, apt.GetInstanceRootKey(domainProject)+"/", rev, response)
+ job := notify.NewInstanceEventWithTime(consumerId, domainProject, evt.Revision, evt.CreateAt, response)
notify.NotifyCenter().Publish(job)
}
}
diff --git a/server/service/event/rule_event_handler.go b/server/service/event/rule_event_handler.go
index 58051d9..6ac3a28 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -30,12 +30,13 @@ import (
)
type RulesChangedTask struct {
+ discovery.KvEvent
+
key string
err error
DomainProject string
ProviderId string
- Rev int64
}
func (apt *RulesChangedTask) Key() string {
@@ -43,7 +44,7 @@ func (apt *RulesChangedTask) Key() string {
}
func (apt *RulesChangedTask) Do(ctx context.Context) error {
- apt.err = apt.publish(ctx, apt.DomainProject, apt.ProviderId, apt.Rev)
+ apt.err = apt.publish(ctx, apt.DomainProject, apt.ProviderId)
return apt.err
}
@@ -51,7 +52,7 @@ func (apt *RulesChangedTask) Err() error {
return apt.err
}
-func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, providerId string, rev int64) error {
+func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, providerId string) error {
ctx = context.WithValue(context.WithValue(ctx,
serviceUtil.CTX_CACHEONLY, "1"),
serviceUtil.CTX_GLOBAL, "1")
@@ -74,7 +75,7 @@ func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, provide
}
providerKey := pb.MicroServiceToKey(domainProject, provider)
- PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, rev, consumerIds)
+ PublishInstanceEvent(apt.KvEvent, domainProject, providerKey, consumerIds)
return nil
}
@@ -100,18 +101,19 @@ func (h *RuleEventHandler) OnEvent(evt discovery.KvEvent) {
log.Infof("caught [%s] service rule[%s/%s] event", action, providerId, ruleId)
task.Service().Add(context.Background(),
- NewRulesChangedAsyncTask(domainProject, providerId, evt.Revision))
+ NewRulesChangedAsyncTask(domainProject, providerId, evt))
}
func NewRuleEventHandler() *RuleEventHandler {
return &RuleEventHandler{}
}
-func NewRulesChangedAsyncTask(domainProject, providerId string, rev int64) *RulesChangedTask {
+func NewRulesChangedAsyncTask(domainProject, providerId string, evt discovery.KvEvent) *RulesChangedTask {
+ evt.Type = pb.EVT_EXPIRE
return &RulesChangedTask{
+ KvEvent: evt,
key: "RulesChangedAsyncTask_" + providerId,
DomainProject: domainProject,
ProviderId: providerId,
- Rev: rev,
}
}
diff --git a/server/service/event/tag_event_handler.go b/server/service/event/tag_event_handler.go
index 7059e8f..05f4ecb 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -31,12 +31,13 @@ import (
)
type TagsChangedTask struct {
+ discovery.KvEvent
+
key string
err error
DomainProject string
- consumerId string
- Rev int64
+ ConsumerId string
}
func (apt *TagsChangedTask) Key() string {
@@ -44,7 +45,7 @@ func (apt *TagsChangedTask) Key() string {
}
func (apt *TagsChangedTask) Do(ctx context.Context) error {
- apt.err = apt.publish(ctx, apt.DomainProject, apt.consumerId, apt.Rev)
+ apt.err = apt.publish(ctx, apt.DomainProject, apt.ConsumerId)
return apt.err
}
@@ -52,7 +53,7 @@ func (apt *TagsChangedTask) Err() error {
return apt.err
}
-func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumerId string, rev int64) error {
+func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumerId string) error {
ctx = context.WithValue(context.WithValue(ctx,
serviceUtil.CTX_CACHEONLY, "1"),
serviceUtil.CTX_GLOBAL, "1")
@@ -86,7 +87,7 @@ func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumer
}
providerKey := pb.MicroServiceToKey(domainProject, provider)
- PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, rev, []string{consumerId})
+ PublishInstanceEvent(apt.KvEvent, domainProject, providerKey, []string{consumerId})
}
return nil
}
@@ -114,18 +115,19 @@ func (h *TagEventHandler) OnEvent(evt discovery.KvEvent) {
log.Infof("caught [%s] service tags[%s/%s] event", action, consumerId, evt.KV.Value)
task.Service().Add(context.Background(),
- NewTagsChangedAsyncTask(domainProject, consumerId, evt.Revision))
+ NewTagsChangedAsyncTask(domainProject, consumerId, evt))
}
func NewTagEventHandler() *TagEventHandler {
return &TagEventHandler{}
}
-func NewTagsChangedAsyncTask(domainProject, consumerId string, rev int64) *TagsChangedTask {
+func NewTagsChangedAsyncTask(domainProject, consumerId string, evt discovery.KvEvent) *TagsChangedTask {
+ evt.Type = pb.EVT_EXPIRE
return &TagsChangedTask{
+ KvEvent: evt,
key: "TagsChangedAsyncTask_" + consumerId,
DomainProject: domainProject,
- consumerId: consumerId,
- Rev: rev,
+ ConsumerId: consumerId,
}
}