You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2019/02/14 03:02:34 UTC

[servicecomb-service-center] branch master updated: SCB-1092 Output event processing time (#533)

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new f120eb3  SCB-1092 Output event processing time (#533)
f120eb3 is described below

commit f120eb3fbed95559002eeba432a30473c061a4cd
Author: little-cui <su...@qq.com>
AuthorDate: Thu Feb 14 11:02:28 2019 +0800

    SCB-1092 Output event processing time (#533)
    
    * SCB-1092 Output event processing time
    
    * SCB-1092 More abundant metrics information
---
 integration/health-metrics-grafana.json        | 330 ++++++++++++++++++++++---
 pkg/buffer/reader.go                           |  38 +++
 pkg/buffer/reader_test.go                      |  51 ++++
 pkg/client/sc/client_lb.go                     |  21 +-
 pkg/client/sc/client_lb_test.go                |  56 +++++
 pkg/notify/notice.go                           |   8 +-
 pkg/notify/notice_test.go                      |  33 +++
 pkg/rest/client.go                             |  54 ++--
 server/notify/listwatcher.go                   |  17 +-
 server/notify/metrics.go                       |   9 +-
 server/notify/stream.go                        |   4 +-
 server/notify/stream_test.go                   |  25 +-
 server/notify/websocket.go                     |   5 +-
 server/plugin/pkg/discovery/cacher.go          |   2 +-
 server/plugin/pkg/discovery/etcd/cacher_kv.go  |  23 +-
 server/plugin/pkg/discovery/metrics.go         |  20 +-
 server/plugin/pkg/discovery/types.go           |   7 +
 server/service/event/instance_event_handler.go |  11 +-
 server/service/event/rule_event_handler.go     |  16 +-
 server/service/event/tag_event_handler.go      |  20 +-
 20 files changed, 623 insertions(+), 127 deletions(-)

diff --git a/integration/health-metrics-grafana.json b/integration/health-metrics-grafana.json
index 005c2bb..e7acfb7 100644
--- a/integration/health-metrics-grafana.json
+++ b/integration/health-metrics-grafana.json
@@ -1157,7 +1157,7 @@
       "fill": 1,
       "gridPos": {
         "h": 6,
-        "w": 8,
+        "w": 6,
         "x": 12,
         "y": 13
       },
@@ -1239,6 +1239,99 @@
       }
     },
     {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_LOCAL}",
+      "fill": 1,
+      "gridPos": {
+        "h": 6,
+        "w": 6,
+        "x": 18,
+        "y": 13
+      },
+      "height": "",
+      "id": 42,
+      "legend": {
+        "alignAsTable": true,
+        "avg": true,
+        "current": false,
+        "hideEmpty": true,
+        "hideZero": true,
+        "max": true,
+        "min": true,
+        "rightSide": false,
+        "show": true,
+        "sort": "avg",
+        "sortDesc": true,
+        "total": false,
+        "values": true
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "minSpan": 4,
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "max(avg_over_time(service_center_notify_publish_durations_microseconds{job=\"service-center\"}[1m])) by (source)",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "legendFormat": "{{source}}",
+          "refId": "B"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Events Latency",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "transparent": false,
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "µs",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
       "cacheTimeout": null,
       "colorBackground": false,
       "colorValue": true,
@@ -2030,25 +2123,205 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": "sum(service_center_notify_publish_total{job=\"service-center\"}) by (instance,source)",
+          "expr": "sum(irate(service_center_db_backend_event_total{job=\"service-center\"}[1m])) by (instance,prefix)",
           "format": "time_series",
-          "instant": false,
           "intervalFactor": 1,
-          "legendFormat": "{{instance}}> {{source}}",
+          "legendFormat": "{{instance}}> {{prefix}}",
+          "refId": "B"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Publish Events",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "transparent": false,
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "decimals": 0,
+          "format": "ops",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_LOCAL}",
+      "fill": 1,
+      "gridPos": {
+        "h": 6,
+        "w": 6,
+        "x": 6,
+        "y": 34
+      },
+      "height": "",
+      "id": 41,
+      "legend": {
+        "alignAsTable": false,
+        "avg": true,
+        "current": false,
+        "hideEmpty": true,
+        "hideZero": true,
+        "max": true,
+        "min": true,
+        "rightSide": true,
+        "show": false,
+        "sort": "avg",
+        "sortDesc": true,
+        "total": false,
+        "values": true
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "minSpan": 4,
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "max(avg_over_time(service_center_db_backend_event_durations_microseconds{job=\"service-center\"}[1m])) by (instance, prefix)",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "legendFormat": "{{instance}}> {{prefix}}",
           "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Publish Events Latency",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "transparent": false,
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "µs",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
         },
         {
-          "expr": "sum(service_center_db_backend_event_total{job=\"service-center\"}) by (instance)",
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_LOCAL}",
+      "fill": 1,
+      "gridPos": {
+        "h": 6,
+        "w": 6,
+        "x": 12,
+        "y": 34
+      },
+      "height": "",
+      "id": 40,
+      "legend": {
+        "alignAsTable": false,
+        "avg": false,
+        "current": false,
+        "hideEmpty": true,
+        "hideZero": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
+        "show": false,
+        "sort": "avg",
+        "sortDesc": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "minSpan": 4,
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(irate(service_center_notify_publish_total{job=\"service-center\"}[1m])) by (instance,source)",
           "format": "time_series",
+          "instant": false,
           "intervalFactor": 1,
-          "legendFormat": "{{instance}}> BACKEND",
-          "refId": "B"
+          "legendFormat": "{{instance}}> {{source}}",
+          "refId": "A"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Events Total",
+      "title": "Subscribe Events",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2066,7 +2339,7 @@
       "yaxes": [
         {
           "decimals": 0,
-          "format": "none",
+          "format": "ops",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -2097,7 +2370,7 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 6,
+        "x": 18,
         "y": 34
       },
       "height": "",
@@ -2137,19 +2410,12 @@
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{source}}",
           "refId": "B"
-        },
-        {
-          "expr": "max(avg_over_time(service_center_db_backend_event_durations_microseconds{job=\"service-center\"}[1m])) by (instance)",
-          "format": "time_series",
-          "intervalFactor": 1,
-          "legendFormat": "{{instance}}> BACKEND",
-          "refId": "A"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Events Latency",
+      "title": "Subscribe Events Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2197,8 +2463,8 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 12,
-        "y": 34
+        "x": 0,
+        "y": 40
       },
       "height": "",
       "id": 37,
@@ -2292,8 +2558,8 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 18,
-        "y": 34
+        "x": 6,
+        "y": 40
       },
       "height": "",
       "id": 38,
@@ -2381,7 +2647,7 @@
         "h": 3,
         "w": 24,
         "x": 0,
-        "y": 40
+        "y": 46
       },
       "height": "1px",
       "id": 4,
@@ -2402,7 +2668,7 @@
         "h": 7,
         "w": 8,
         "x": 0,
-        "y": 43
+        "y": 49
       },
       "height": "",
       "id": 9,
@@ -2490,7 +2756,7 @@
         "h": 7,
         "w": 8,
         "x": 8,
-        "y": 43
+        "y": 49
       },
       "id": 14,
       "legend": {
@@ -2575,7 +2841,7 @@
         "h": 7,
         "w": 8,
         "x": 16,
-        "y": 43
+        "y": 49
       },
       "id": 5,
       "legend": {
@@ -2660,7 +2926,7 @@
         "h": 7,
         "w": 8,
         "x": 0,
-        "y": 50
+        "y": 56
       },
       "id": 6,
       "legend": {
@@ -2745,7 +3011,7 @@
         "h": 7,
         "w": 8,
         "x": 8,
-        "y": 50
+        "y": 56
       },
       "height": "",
       "id": 2,
@@ -2834,7 +3100,7 @@
         "h": 7,
         "w": 8,
         "x": 16,
-        "y": 50
+        "y": 56
       },
       "height": "",
       "id": 8,
@@ -2921,7 +3187,7 @@
         "h": 7,
         "w": 24,
         "x": 0,
-        "y": 57
+        "y": 63
       },
       "id": 7,
       "legend": {
@@ -3014,7 +3280,7 @@
     "list": []
   },
   "time": {
-    "from": "now-1h",
+    "from": "now-5m",
     "to": "now"
   },
   "timepicker": {
@@ -3045,5 +3311,5 @@
   "timezone": "",
   "title": "ServiceCenter",
   "uid": "Zg6NoHGiz",
-  "version": 7
+  "version": 9
 }
\ No newline at end of file
diff --git a/pkg/buffer/reader.go b/pkg/buffer/reader.go
new file mode 100644
index 0000000..06a0c23
--- /dev/null
+++ b/pkg/buffer/reader.go
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buffer
+
+import (
+	"bytes"
+	"io"
+	"strings"
+)
+
+func ReadLine(buf *bytes.Buffer, cb func(line string) bool) error {
+	for {
+		s, err := buf.ReadString('\n')
+		if err != nil && err != io.EOF {
+			return err
+		}
+		if !cb(strings.TrimSpace(s)) {
+			break
+		}
+		if err != nil {
+			break
+		}
+	}
+	return nil
+}
diff --git a/pkg/buffer/reader_test.go b/pkg/buffer/reader_test.go
new file mode 100644
index 0000000..36caf39
--- /dev/null
+++ b/pkg/buffer/reader_test.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buffer
+
+import (
+	"bytes"
+	"testing"
+)
+
+func TestReadLine(t *testing.T) {
+	buf := bytes.NewBuffer([]byte("a\nb\r\nc\n "))
+	err := ReadLine(buf, func(line string) bool {
+		switch line {
+		case "a", "b", "c", "":
+		default:
+			t.Fatal("TestReadLine")
+		}
+		return true
+	})
+	if err != nil {
+		t.Fatal("TestReadLine", err)
+	}
+
+	buf = bytes.NewBuffer([]byte("a\nb"))
+	err = ReadLine(buf, func(line string) bool {
+		switch line {
+		case "a":
+		case "b":
+			t.Fatal("TestReadLine")
+		default:
+			t.Fatal("TestReadLine")
+		}
+		return false
+	})
+	if err != nil {
+		t.Fatal("TestReadLine", err)
+	}
+}
diff --git a/pkg/client/sc/client_lb.go b/pkg/client/sc/client_lb.go
index 55818b1..8b99b01 100644
--- a/pkg/client/sc/client_lb.go
+++ b/pkg/client/sc/client_lb.go
@@ -16,9 +16,11 @@
 package sc
 
 import (
-	"github.com/apache/servicecomb-service-center/pkg/backoff"
+	"errors"
+	"fmt"
 	"github.com/apache/servicecomb-service-center/pkg/lb"
 	"github.com/apache/servicecomb-service-center/pkg/rest"
+	"github.com/apache/servicecomb-service-center/pkg/util"
 	"golang.org/x/net/context"
 	"net/http"
 )
@@ -46,9 +48,18 @@ func (c *LBClient) Next() string {
 }
 
 func (c *LBClient) RestDoWithContext(ctx context.Context, method string, api string, headers http.Header, body []byte) (resp *http.Response, err error) {
-	err = backoff.DelayIn(c.Retries, func() (rerr error) {
-		resp, rerr = c.HttpDoWithContext(ctx, method, c.Next()+api, headers, body)
-		return
-	})
+	var errs []string
+	for i := 0; i < c.Retries; i++ {
+		addr := c.Next()
+		resp, err = c.HttpDoWithContext(ctx, method, addr+api, headers, body)
+		if err != nil {
+			errs = append(errs, fmt.Sprintf("[%s]: %s", addr, err.Error()))
+			continue
+		}
+		break
+	}
+	if err != nil {
+		err = errors.New(util.StringJoin(errs, ", "))
+	}
 	return
 }
diff --git a/pkg/client/sc/client_lb_test.go b/pkg/client/sc/client_lb_test.go
new file mode 100644
index 0000000..0dcf159
--- /dev/null
+++ b/pkg/client/sc/client_lb_test.go
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sc
+
+import (
+	"fmt"
+	"github.com/apache/servicecomb-service-center/pkg/rest"
+	"golang.org/x/net/context"
+	"io/ioutil"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"testing"
+)
+
+func TestNewLBClient(t *testing.T) {
+	os.Setenv("DEBUG_MODE", "1")
+	client, err := NewLBClient([]string{"x.x.x.x", "rest://2.2.2.2"}, rest.DefaultURLClientOption())
+	if err != nil {
+		t.Fatal("TestNewLBClient", err)
+	}
+	_, err = client.RestDoWithContext(context.Background(), "yyy", "/zzz", http.Header{"test": []string{"a"}}, []byte(`abcdef`))
+	if err == nil {
+		t.Fatal("TestNewLBClient")
+	}
+	fmt.Println(err)
+
+	svc := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+		w.WriteHeader(http.StatusOK)
+		b, _ := ioutil.ReadAll(req.Body)
+		w.Write(b)
+	}))
+	defer svc.Close()
+
+	client, err = NewLBClient([]string{"x.x.x.x", svc.URL}, rest.DefaultURLClientOption())
+	if err != nil {
+		t.Fatal("TestNewLBClient", err)
+	}
+	_, err = client.RestDoWithContext(context.Background(), http.MethodGet, "", http.Header{"test": []string{"a"}}, []byte(`abcdef`))
+	if err != nil {
+		t.Fatal("TestNewLBClient", err)
+	}
+}
diff --git a/pkg/notify/notice.go b/pkg/notify/notice.go
index e5ac265..cc3b272 100644
--- a/pkg/notify/notice.go
+++ b/pkg/notify/notice.go
@@ -51,6 +51,10 @@ func (s *baseEvent) CreateAt() time.Time {
 	return s.createAt.Local()
 }
 
-func NewEvent(t Type, s string, g string) Event {
-	return &baseEvent{t, s, g, simple.FromTime(time.Now())}
+func NewEvent(t Type, s, g string) Event {
+	return NewEventWithTime(t, s, g, simple.FromTime(time.Now()))
+}
+
+func NewEventWithTime(t Type, s, g string, now simple.Time) Event {
+	return &baseEvent{t, s, g, now}
 }
diff --git a/pkg/notify/notice_test.go b/pkg/notify/notice_test.go
new file mode 100644
index 0000000..5ad78d2
--- /dev/null
+++ b/pkg/notify/notice_test.go
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package notify
+
+import (
+	"fmt"
+	"testing"
+)
+
+func TestNewEventWithTime(t *testing.T) {
+	evt := NewEvent(NOTIFTY, "a", "b")
+	if evt.CreateAt().UnixNano() == 0 {
+		t.Fatal("TestNewEventWithTime")
+	}
+	fmt.Println(evt.CreateAt())
+
+	if evt.Type() != NOTIFTY || evt.Subject() != "a" || evt.Group() != "b" {
+		t.Fatal("TestNewEventWithTime")
+	}
+}
diff --git a/pkg/rest/client.go b/pkg/rest/client.go
index 8250637..6eb5843 100644
--- a/pkg/rest/client.go
+++ b/pkg/rest/client.go
@@ -21,12 +21,14 @@ import (
 	"crypto/tls"
 	"errors"
 	"fmt"
+	"github.com/apache/servicecomb-service-center/pkg/buffer"
 	"github.com/apache/servicecomb-service-center/pkg/tlsutil"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"golang.org/x/net/context"
 	"io"
 	"io/ioutil"
 	"net/http"
+	"net/http/httputil"
 	"net/url"
 	"os"
 	"strings"
@@ -118,10 +120,15 @@ func (client *URLClient) HttpDoWithContext(ctx context.Context, method string, r
 	req = req.WithContext(ctx)
 	req.Header = headers
 
+	DumpRequestOut(req)
+
 	resp, err = client.Client.Do(req)
 	if err != nil {
 		return nil, err
 	}
+
+	DumpResponse(resp)
+
 	switch resp.Header.Get(HEADER_CONTENT_ENCODING) {
 	case "gzip":
 		reader, err := NewGZipBodyReader(resp.Body)
@@ -133,32 +140,41 @@ func (client *URLClient) HttpDoWithContext(ctx context.Context, method string, r
 		resp.Body = reader
 	}
 
-	if os.Getenv("DEBUG_MODE") == "1" {
-		fmt.Println("--- BEGIN ---")
-		fmt.Printf("> %s %s %s\n", req.Method, req.URL.RequestURI(), req.Proto)
-		for key, header := range req.Header {
-			for _, value := range header {
-				fmt.Printf("> %s: %s\n", key, value)
-			}
-		}
-		fmt.Println(">")
-		fmt.Println(util.BytesToStringWithNoCopy(body))
-		fmt.Printf("< %s %s\n", resp.Proto, resp.Status)
-		for key, header := range resp.Header {
-			for _, value := range header {
-				fmt.Printf("< %s: %s\n", key, value)
-			}
-		}
-		fmt.Println("<")
-		fmt.Println("--- END ---")
-	}
 	return resp, nil
 }
 
+func DumpRequestOut(req *http.Request) {
+	if req == nil || !util.StringTRUE(os.Getenv("DEBUG_MODE")) {
+		return
+	}
+
+	b, _ := httputil.DumpRequestOut(req, true)
+	buffer.ReadLine(bytes.NewBuffer(b), func(line string) bool {
+		fmt.Println(">", line)
+		return true
+	})
+}
+
+func DumpResponse(resp *http.Response) {
+	if resp == nil || !util.StringTRUE(os.Getenv("DEBUG_MODE")) {
+		return
+	}
+
+	b, _ := httputil.DumpResponse(resp, true)
+	buffer.ReadLine(bytes.NewBuffer(b), func(line string) bool {
+		fmt.Println("<", line)
+		return true
+	})
+}
+
 func (client *URLClient) HttpDo(method string, rawURL string, headers http.Header, body []byte) (resp *http.Response, err error) {
 	return client.HttpDoWithContext(context.Background(), method, rawURL, headers, body)
 }
 
+func DefaultURLClientOption() URLClientOption {
+	return defaultURLClientOption
+}
+
 func setOptionDefaultValue(o *URLClientOption) URLClientOption {
 	if o == nil {
 		return defaultURLClientOption
diff --git a/server/notify/listwatcher.go b/server/notify/listwatcher.go
index 0b831d7..b5c5394 100644
--- a/server/notify/listwatcher.go
+++ b/server/notify/listwatcher.go
@@ -20,6 +20,7 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/gopool"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/notify"
+	simple "github.com/apache/servicecomb-service-center/pkg/time"
 	pb "github.com/apache/servicecomb-service-center/server/core/proto"
 	"golang.org/x/net/context"
 	"time"
@@ -124,18 +125,26 @@ func (w *InstanceEventListWatcher) Close() {
 	close(w.Job)
 }
 
-func NewInstanceEvent(group, subject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent {
+func NewInstanceEvent(serviceId, domainProject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent {
 	return &InstanceEvent{
-		Event:    notify.NewEvent(INSTANCE, subject, group),
+		Event:    notify.NewEvent(INSTANCE, domainProject, serviceId),
 		Revision: rev,
 		Response: response,
 	}
 }
 
-func NewInstanceEventListWatcher(group string, subject string,
+func NewInstanceEventWithTime(serviceId, domainProject string, rev int64, createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
+	return &InstanceEvent{
+		Event:    notify.NewEventWithTime(INSTANCE, domainProject, serviceId, createAt),
+		Revision: rev,
+		Response: response,
+	}
+}
+
+func NewInstanceEventListWatcher(serviceId, domainProject string,
 	listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *InstanceEventListWatcher {
 	watcher := &InstanceEventListWatcher{
-		Subscriber: notify.NewSubscriber(INSTANCE, subject, group),
+		Subscriber: notify.NewSubscriber(INSTANCE, domainProject, serviceId),
 		Job:        make(chan *InstanceEvent, INSTANCE.QueueSize()),
 		ListFunc:   listFunc,
 		listCh:     make(chan struct{}),
diff --git a/server/notify/metrics.go b/server/notify/metrics.go
index eac4c52..94fa7d9 100644
--- a/server/notify/metrics.go
+++ b/server/notify/metrics.go
@@ -16,6 +16,7 @@
 package notify
 
 import (
+	"github.com/apache/servicecomb-service-center/pkg/notify"
 	"github.com/apache/servicecomb-service-center/server/metric"
 	"github.com/prometheus/client_golang/prometheus"
 	"time"
@@ -49,13 +50,13 @@ func init() {
 	prometheus.MustRegister(notifyCounter, notifyLatency)
 }
 
-func ReportPublishCompleted(source string, err error, start time.Time) {
+func ReportPublishCompleted(evt notify.Event, err error) {
 	instance := metric.InstanceName()
-	elapsed := float64(time.Since(start).Nanoseconds()) / float64(time.Microsecond)
+	elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) / float64(time.Microsecond)
 	status := success
 	if err != nil {
 		status = failure
 	}
-	notifyLatency.WithLabelValues(instance, source, status).Observe(elapsed)
-	notifyCounter.WithLabelValues(instance, source, status).Inc()
+	notifyLatency.WithLabelValues(instance, evt.Type().String(), status).Observe(elapsed)
+	notifyCounter.WithLabelValues(instance, evt.Type().String(), status).Inc()
 }
diff --git a/server/notify/stream.go b/server/notify/stream.go
index 0ac6403..3fe7371 100644
--- a/server/notify/stream.go
+++ b/server/notify/stream.go
@@ -49,7 +49,9 @@ func HandleWatchJob(watcher *InstanceEventListWatcher, stream pb.ServiceInstance
 				watcher.Subject(), watcher.Group())
 
 			err = stream.Send(resp)
-			ReportPublishCompleted(INSTANCE.String(), err, job.CreateAt())
+			if job != nil {
+				ReportPublishCompleted(job, err)
+			}
 			if err != nil {
 				log.Errorf(err, "send message error, subject: %s, group: %s",
 					watcher.Subject(), watcher.Group())
diff --git a/server/notify/stream_test.go b/server/notify/stream_test.go
index 60bd8d9..3280bf6 100644
--- a/server/notify/stream_test.go
+++ b/server/notify/stream_test.go
@@ -18,21 +18,36 @@ package notify
 
 import (
 	"github.com/apache/servicecomb-service-center/pkg/log"
+	simple "github.com/apache/servicecomb-service-center/pkg/time"
+	pb "github.com/apache/servicecomb-service-center/server/core/proto"
 	"golang.org/x/net/context"
+	"google.golang.org/grpc"
 	"testing"
+	"time"
 )
 
+type grpcWatchServer struct {
+	grpc.ServerStream
+}
+
+func (x *grpcWatchServer) Send(m *pb.WatchInstanceResponse) error {
+	return nil
+}
+
+func (x *grpcWatchServer) Context() context.Context {
+	return context.Background()
+}
+
 func TestHandleWatchJob(t *testing.T) {
-	defer log.Recover()
 	w := NewInstanceEventListWatcher("g", "s", nil)
 	w.Job <- nil
-	err := HandleWatchJob(w, nil)
+	err := HandleWatchJob(w, &grpcWatchServer{})
 	if err == nil {
 		t.Fatalf("TestHandleWatchJob failed")
 	}
-	w.Job <- NewInstanceEvent("g", "s", 1, nil)
-	err = HandleWatchJob(w, nil)
-	t.Fatalf("TestHandleWatchJob failed")
+	w.Job <- NewInstanceEventWithTime("g", "s", 1, simple.FromTime(time.Now()), nil)
+	w.Job <- nil
+	HandleWatchJob(w, &grpcWatchServer{})
 }
 
 func TestDoStreamListAndWatch(t *testing.T) {
diff --git a/server/notify/websocket.go b/server/notify/websocket.go
index cc789e8..033ab12 100644
--- a/server/notify/websocket.go
+++ b/server/notify/websocket.go
@@ -21,7 +21,6 @@ import (
 	"fmt"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
-	apt "github.com/apache/servicecomb-service-center/server/core"
 	pb "github.com/apache/servicecomb-service-center/server/core/proto"
 	serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
 	"github.com/gorilla/websocket"
@@ -239,7 +238,7 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) {
 
 	err := wh.WriteMessage(message)
 	if job != nil {
-		ReportPublishCompleted(INSTANCE.String(), err, job.CreateAt())
+		ReportPublishCompleted(job, err)
 	}
 	if err != nil {
 		log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: %s",
@@ -272,7 +271,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([]
 	socket := &WebSocket{
 		ctx:     ctx,
 		conn:    conn,
-		watcher: NewInstanceEventListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f),
+		watcher: NewInstanceEventListWatcher(serviceId, domainProject, f),
 	}
 	process(socket)
 }
diff --git a/server/plugin/pkg/discovery/cacher.go b/server/plugin/pkg/discovery/cacher.go
index 086d928..9b19719 100644
--- a/server/plugin/pkg/discovery/cacher.go
+++ b/server/plugin/pkg/discovery/cacher.go
@@ -40,7 +40,7 @@ func (c *CommonCacher) Notify(action proto.EventType, key string, kv *KeyValue)
 	default:
 		c.cache.Put(key, kv)
 	}
-	c.OnEvent(KvEvent{Type: action, KV: kv, Revision: kv.ModRevision})
+	c.OnEvent(NewKvEvent(action, kv, kv.ModRevision))
 }
 
 func (c *CommonCacher) OnEvent(evt KvEvent) {
diff --git a/server/plugin/pkg/discovery/etcd/cacher_kv.go b/server/plugin/pkg/discovery/etcd/cacher_kv.go
index 20596c1..323e1d8 100644
--- a/server/plugin/pkg/discovery/etcd/cacher_kv.go
+++ b/server/plugin/pkg/discovery/etcd/cacher_kv.go
@@ -132,7 +132,7 @@ func (c *KvCacher) handleWatcher(watcher Watcher) error {
 		rev := resp.Revision
 		evts := make([]discovery.KvEvent, 0, len(resp.Kvs))
 		for _, kv := range resp.Kvs {
-			evt := discovery.KvEvent{Revision: kv.ModRevision}
+			evt := discovery.NewKvEvent(proto.EVT_CREATE, nil, kv.ModRevision)
 			switch {
 			case resp.Action == registry.Put && kv.Version == 1:
 				evt.Type, evt.KV = proto.EVT_CREATE, c.doParse(kv)
@@ -252,11 +252,7 @@ func (c *KvCacher) filterDelete(newStore map[string]*mvccpb.KeyValue,
 			i = 0
 		}
 
-		block[i] = discovery.KvEvent{
-			Revision: rev,
-			Type:     proto.EVT_DELETE,
-			KV:       v,
-		}
+		block[i] = discovery.NewKvEvent(proto.EVT_DELETE, v, rev)
 		i++
 		return
 	})
@@ -283,11 +279,7 @@ func (c *KvCacher) filterCreateOrUpdate(newStore map[string]*mvccpb.KeyValue,
 			}
 
 			if kv := c.doParse(v); kv != nil {
-				block[i] = discovery.KvEvent{
-					Revision: rev,
-					Type:     proto.EVT_CREATE,
-					KV:       kv,
-				}
+				block[i] = discovery.NewKvEvent(proto.EVT_CREATE, kv, rev)
 				i++
 			}
 			continue
@@ -304,11 +296,7 @@ func (c *KvCacher) filterCreateOrUpdate(newStore map[string]*mvccpb.KeyValue,
 		}
 
 		if kv := c.doParse(v); kv != nil {
-			block[i] = discovery.KvEvent{
-				Revision: rev,
-				Type:     proto.EVT_UPDATE,
-				KV:       kv,
-			}
+			block[i] = discovery.NewKvEvent(proto.EVT_UPDATE, kv, rev)
 			i++
 		}
 	}
@@ -368,7 +356,6 @@ func (c *KvCacher) deferHandle(ctx context.Context) {
 }
 
 func (c *KvCacher) onEvents(evts []discovery.KvEvent) {
-	start := time.Now()
 	init := !c.IsReady()
 	for i, evt := range evts {
 		key := util.BytesToStringWithNoCopy(evt.KV.Key)
@@ -406,7 +393,7 @@ func (c *KvCacher) onEvents(evts []discovery.KvEvent) {
 
 	c.notify(evts)
 
-	discovery.ReportProcessEventCompleted(evts, start)
+	discovery.ReportProcessEventCompleted(c.Cfg.Key, evts)
 }
 
 func (c *KvCacher) notify(evts []discovery.KvEvent) {
diff --git a/server/plugin/pkg/discovery/metrics.go b/server/plugin/pkg/discovery/metrics.go
index 4e3737b..16681e8 100644
--- a/server/plugin/pkg/discovery/metrics.go
+++ b/server/plugin/pkg/discovery/metrics.go
@@ -22,11 +22,6 @@ import (
 	"time"
 )
 
-const (
-	success = "SUCCESS"
-	failure = "FAILURE"
-)
-
 var (
 	eventsCounter = prometheus.NewGaugeVec(
 		prometheus.GaugeOpts{
@@ -34,7 +29,7 @@ var (
 			Subsystem: "db",
 			Name:      "backend_event_total",
 			Help:      "Counter of backend events",
-		}, []string{"instance"})
+		}, []string{"instance", "prefix"})
 
 	eventsLatency = prometheus.NewSummaryVec(
 		prometheus.SummaryOpts{
@@ -43,20 +38,23 @@ var (
 			Name:       "backend_event_durations_microseconds",
 			Help:       "Latency of backend events processing",
 			Objectives: prometheus.DefObjectives,
-		}, []string{"instance"})
+		}, []string{"instance", "prefix"})
 )
 
 func init() {
 	prometheus.MustRegister(eventsCounter, eventsLatency)
 }
 
-func ReportProcessEventCompleted(evts []KvEvent, start time.Time) {
+func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
 	l := float64(len(evts))
 	if l == 0 {
 		return
 	}
 	instance := metric.InstanceName()
-	elapsed := float64(time.Since(start).Nanoseconds()) / float64(time.Microsecond)
-	eventsLatency.WithLabelValues(instance).Observe(elapsed / l)
-	eventsCounter.WithLabelValues(instance).Add(l)
+	now := time.Now()
+	for _, evt := range evts {
+		elapsed := float64(now.Sub(evt.CreateAt.Local()).Nanoseconds()) / float64(time.Microsecond)
+		eventsLatency.WithLabelValues(instance, prefix).Observe(elapsed)
+	}
+	eventsCounter.WithLabelValues(instance, prefix).Add(l)
 }
diff --git a/server/plugin/pkg/discovery/types.go b/server/plugin/pkg/discovery/types.go
index 1b759e7..d83856d 100644
--- a/server/plugin/pkg/discovery/types.go
+++ b/server/plugin/pkg/discovery/types.go
@@ -18,10 +18,12 @@ package discovery
 import (
 	"encoding/json"
 	"fmt"
+	simple "github.com/apache/servicecomb-service-center/pkg/time"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	pb "github.com/apache/servicecomb-service-center/server/core/proto"
 	"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
 	"strconv"
+	"time"
 )
 
 var (
@@ -85,6 +87,7 @@ type KvEvent struct {
 	Revision int64
 	Type     pb.EventType
 	KV       *KeyValue
+	CreateAt simple.Time
 }
 
 type KvEventFunc func(evt KvEvent)
@@ -93,3 +96,7 @@ type KvEventHandler interface {
 	Type() Type
 	OnEvent(evt KvEvent)
 }
+
+func NewKvEvent(action pb.EventType, kv *KeyValue, rev int64) KvEvent {
+	return KvEvent{Type: action, KV: kv, Revision: rev, CreateAt: simple.FromTime(time.Now())}
+}
diff --git a/server/service/event/instance_event_handler.go b/server/service/event/instance_event_handler.go
index e82ec04..e4b99cf 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -88,15 +88,14 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
 		return
 	}
 
-	PublishInstanceEvent(domainProject, action, pb.MicroServiceToKey(domainProject, ms),
-		instance, evt.Revision, consumerIds)
+	PublishInstanceEvent(evt, domainProject, pb.MicroServiceToKey(domainProject, ms), consumerIds)
 }
 
 func NewInstanceEventHandler() *InstanceEventHandler {
 	return &InstanceEventHandler{}
 }
 
-func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, subscribers []string) {
+func PublishInstanceEvent(evt discovery.KvEvent, domainProject string, serviceKey *pb.MicroServiceKey, subscribers []string) {
 	defer cache.FindInstances.Remove(serviceKey)
 
 	if len(subscribers) == 0 {
@@ -105,13 +104,13 @@ func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey
 
 	response := &pb.WatchInstanceResponse{
 		Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch instance successfully."),
-		Action:   string(action),
+		Action:   string(evt.Type),
 		Key:      serviceKey,
-		Instance: instance,
+		Instance: evt.KV.Value.(*pb.MicroServiceInstance),
 	}
 	for _, consumerId := range subscribers {
 		// TODO add超时怎么处理?
-		job := notify.NewInstanceEvent(consumerId, apt.GetInstanceRootKey(domainProject)+"/", rev, response)
+		job := notify.NewInstanceEventWithTime(consumerId, domainProject, evt.Revision, evt.CreateAt, response)
 		notify.NotifyCenter().Publish(job)
 	}
 }
diff --git a/server/service/event/rule_event_handler.go b/server/service/event/rule_event_handler.go
index 58051d9..6ac3a28 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -30,12 +30,13 @@ import (
 )
 
 type RulesChangedTask struct {
+	discovery.KvEvent
+
 	key string
 	err error
 
 	DomainProject string
 	ProviderId    string
-	Rev           int64
 }
 
 func (apt *RulesChangedTask) Key() string {
@@ -43,7 +44,7 @@ func (apt *RulesChangedTask) Key() string {
 }
 
 func (apt *RulesChangedTask) Do(ctx context.Context) error {
-	apt.err = apt.publish(ctx, apt.DomainProject, apt.ProviderId, apt.Rev)
+	apt.err = apt.publish(ctx, apt.DomainProject, apt.ProviderId)
 	return apt.err
 }
 
@@ -51,7 +52,7 @@ func (apt *RulesChangedTask) Err() error {
 	return apt.err
 }
 
-func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, providerId string, rev int64) error {
+func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, providerId string) error {
 	ctx = context.WithValue(context.WithValue(ctx,
 		serviceUtil.CTX_CACHEONLY, "1"),
 		serviceUtil.CTX_GLOBAL, "1")
@@ -74,7 +75,7 @@ func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, provide
 	}
 	providerKey := pb.MicroServiceToKey(domainProject, provider)
 
-	PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, rev, consumerIds)
+	PublishInstanceEvent(apt.KvEvent, domainProject, providerKey, consumerIds)
 	return nil
 }
 
@@ -100,18 +101,19 @@ func (h *RuleEventHandler) OnEvent(evt discovery.KvEvent) {
 	log.Infof("caught [%s] service rule[%s/%s] event", action, providerId, ruleId)
 
 	task.Service().Add(context.Background(),
-		NewRulesChangedAsyncTask(domainProject, providerId, evt.Revision))
+		NewRulesChangedAsyncTask(domainProject, providerId, evt))
 }
 
 func NewRuleEventHandler() *RuleEventHandler {
 	return &RuleEventHandler{}
 }
 
-func NewRulesChangedAsyncTask(domainProject, providerId string, rev int64) *RulesChangedTask {
+func NewRulesChangedAsyncTask(domainProject, providerId string, evt discovery.KvEvent) *RulesChangedTask {
+	evt.Type = pb.EVT_EXPIRE
 	return &RulesChangedTask{
+		KvEvent:       evt,
 		key:           "RulesChangedAsyncTask_" + providerId,
 		DomainProject: domainProject,
 		ProviderId:    providerId,
-		Rev:           rev,
 	}
 }
diff --git a/server/service/event/tag_event_handler.go b/server/service/event/tag_event_handler.go
index 7059e8f..05f4ecb 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -31,12 +31,13 @@ import (
 )
 
 type TagsChangedTask struct {
+	discovery.KvEvent
+
 	key string
 	err error
 
 	DomainProject string
-	consumerId    string
-	Rev           int64
+	ConsumerId    string
 }
 
 func (apt *TagsChangedTask) Key() string {
@@ -44,7 +45,7 @@ func (apt *TagsChangedTask) Key() string {
 }
 
 func (apt *TagsChangedTask) Do(ctx context.Context) error {
-	apt.err = apt.publish(ctx, apt.DomainProject, apt.consumerId, apt.Rev)
+	apt.err = apt.publish(ctx, apt.DomainProject, apt.ConsumerId)
 	return apt.err
 }
 
@@ -52,7 +53,7 @@ func (apt *TagsChangedTask) Err() error {
 	return apt.err
 }
 
-func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumerId string, rev int64) error {
+func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumerId string) error {
 	ctx = context.WithValue(context.WithValue(ctx,
 		serviceUtil.CTX_CACHEONLY, "1"),
 		serviceUtil.CTX_GLOBAL, "1")
@@ -86,7 +87,7 @@ func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumer
 		}
 
 		providerKey := pb.MicroServiceToKey(domainProject, provider)
-		PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, rev, []string{consumerId})
+		PublishInstanceEvent(apt.KvEvent, domainProject, providerKey, []string{consumerId})
 	}
 	return nil
 }
@@ -114,18 +115,19 @@ func (h *TagEventHandler) OnEvent(evt discovery.KvEvent) {
 	log.Infof("caught [%s] service tags[%s/%s] event", action, consumerId, evt.KV.Value)
 
 	task.Service().Add(context.Background(),
-		NewTagsChangedAsyncTask(domainProject, consumerId, evt.Revision))
+		NewTagsChangedAsyncTask(domainProject, consumerId, evt))
 }
 
 func NewTagEventHandler() *TagEventHandler {
 	return &TagEventHandler{}
 }
 
-func NewTagsChangedAsyncTask(domainProject, consumerId string, rev int64) *TagsChangedTask {
+func NewTagsChangedAsyncTask(domainProject, consumerId string, evt discovery.KvEvent) *TagsChangedTask {
+	evt.Type = pb.EVT_EXPIRE
 	return &TagsChangedTask{
+		KvEvent:       evt,
 		key:           "TagsChangedAsyncTask_" + consumerId,
 		DomainProject: domainProject,
-		consumerId:    consumerId,
-		Rev:           rev,
+		ConsumerId:    consumerId,
 	}
 }