You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by as...@apache.org on 2018/09/04 11:26:24 UTC
[incubator-servicecomb-service-center] branch master updated:
SCB-890 Lost changed event when bootstrap with embedded etcd (#435)
This is an automated email from the ASF dual-hosted git repository.
asifdxtreme pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 9fc039b SCB-890 Lost changed event when bootstrap with embedded etcd (#435)
9fc039b is described below
commit 9fc039bda4e01adb71ae3f78ef3c9c0683228e2f
Author: little-cui <su...@qq.com>
AuthorDate: Tue Sep 4 19:26:22 2018 +0800
SCB-890 Lost changed event when bootstrap with embedded etcd (#435)
---
.../infra/registry/embededetcd/embededetcd.go | 83 ++++++++++++----------
1 file changed, 44 insertions(+), 39 deletions(-)
diff --git a/server/plugin/infra/registry/embededetcd/embededetcd.go b/server/plugin/infra/registry/embededetcd/embededetcd.go
index 20e7525..4f92853 100644
--- a/server/plugin/infra/registry/embededetcd/embededetcd.go
+++ b/server/plugin/infra/registry/embededetcd/embededetcd.go
@@ -418,35 +418,9 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts ...registry.PluginOpOption)
return
}
- l := len(resp.Events)
- kvs := make([]*mvccpb.KeyValue, l)
- pIdx, prevAction := 0, mvccpb.PUT
- pResp := ®istry.PluginResponse{Action: registry.Put, Succeeded: true}
-
- for _, evt := range resp.Events {
- if prevAction != evt.Type {
- prevAction = evt.Type
-
- if pIdx > 0 {
- err = setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback)
- if err != nil {
- return
- }
- pIdx = 0
- }
- }
-
- pResp.Revision = evt.Kv.ModRevision
- pResp.Action = setKvsAndConvertAction(kvs, pIdx, &evt)
-
- pIdx++
- }
-
- if pIdx > 0 {
- err = setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback)
- if err != nil {
- return
- }
+ err = dispatch(resp.Events, op.WatchCallback)
+ if err != nil {
+ return
}
}
}
@@ -478,7 +452,39 @@ func (s *EtcdEmbed) readyNotify() {
}
}
-func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *mvccpb.Event) registry.ActionType {
+func dispatch(evts []mvccpb.Event, cb registry.WatchCallback) error {
+ l := len(evts)
+ kvs := make([]*mvccpb.KeyValue, l)
+ sIdx, eIdx, rev := 0, 0, int64(0)
+ action, prevEvtType := registry.Put, mvccpb.PUT
+
+ for _, evt := range evts {
+ if prevEvtType != evt.Type {
+ if eIdx > 0 {
+ err := callback(action, rev, kvs[sIdx:eIdx], cb)
+ if err != nil {
+ return err
+ }
+ sIdx = eIdx
+ }
+ prevEvtType = evt.Type
+ }
+
+ if rev < evt.Kv.ModRevision {
+ rev = evt.Kv.ModRevision
+ }
+ action = setKvsAndConvertAction(kvs, eIdx, evt)
+
+ eIdx++
+ }
+
+ if eIdx > 0 {
+ return callback(action, rev, kvs[sIdx:eIdx], cb)
+ }
+ return nil
+}
+
+func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt mvccpb.Event) registry.ActionType {
switch evt.Type {
case mvccpb.DELETE:
kv := evt.PrevKv
@@ -493,15 +499,14 @@ func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *mvccpb.Event)
}
}
-func setResponseAndCallback(pResp *registry.PluginResponse, kvs []*mvccpb.KeyValue, cb registry.WatchCallback) error {
- pResp.Count = int64(len(kvs))
- pResp.Kvs = kvs
-
- err := cb("key information changed", pResp)
- if err != nil {
- return err
- }
- return nil
+func callback(action registry.ActionType, rev int64, kvs []*mvccpb.KeyValue, cb registry.WatchCallback) error {
+ return cb("key information changed", ®istry.PluginResponse{
+ Action: action,
+ Kvs: kvs,
+ Count: int64(len(kvs)),
+ Revision: rev,
+ Succeeded: true,
+ })
}
func getEmbedInstance() mgr.PluginInstance {