You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by as...@apache.org on 2018/09/04 11:26:24 UTC

[incubator-servicecomb-service-center] branch master updated: SCB-890 Lost changed event when bootstrap with embedded etcd (#435)

This is an automated email from the ASF dual-hosted git repository.

asifdxtreme pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fc039b  SCB-890 Lost changed event when bootstrap with embedded etcd (#435)
9fc039b is described below

commit 9fc039bda4e01adb71ae3f78ef3c9c0683228e2f
Author: little-cui <su...@qq.com>
AuthorDate: Tue Sep 4 19:26:22 2018 +0800

    SCB-890 Lost changed event when bootstrap with embedded etcd (#435)
---
 .../infra/registry/embededetcd/embededetcd.go      | 83 ++++++++++++----------
 1 file changed, 44 insertions(+), 39 deletions(-)

diff --git a/server/plugin/infra/registry/embededetcd/embededetcd.go b/server/plugin/infra/registry/embededetcd/embededetcd.go
index 20e7525..4f92853 100644
--- a/server/plugin/infra/registry/embededetcd/embededetcd.go
+++ b/server/plugin/infra/registry/embededetcd/embededetcd.go
@@ -418,35 +418,9 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts ...registry.PluginOpOption)
 					return
 				}
 
-				l := len(resp.Events)
-				kvs := make([]*mvccpb.KeyValue, l)
-				pIdx, prevAction := 0, mvccpb.PUT
-				pResp := &registry.PluginResponse{Action: registry.Put, Succeeded: true}
-
-				for _, evt := range resp.Events {
-					if prevAction != evt.Type {
-						prevAction = evt.Type
-
-						if pIdx > 0 {
-							err = setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback)
-							if err != nil {
-								return
-							}
-							pIdx = 0
-						}
-					}
-
-					pResp.Revision = evt.Kv.ModRevision
-					pResp.Action = setKvsAndConvertAction(kvs, pIdx, &evt)
-
-					pIdx++
-				}
-
-				if pIdx > 0 {
-					err = setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback)
-					if err != nil {
-						return
-					}
+				err = dispatch(resp.Events, op.WatchCallback)
+				if err != nil {
+					return
 				}
 			}
 		}
@@ -478,7 +452,39 @@ func (s *EtcdEmbed) readyNotify() {
 	}
 }
 
-func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *mvccpb.Event) registry.ActionType {
+func dispatch(evts []mvccpb.Event, cb registry.WatchCallback) error {
+	l := len(evts)
+	kvs := make([]*mvccpb.KeyValue, l)
+	sIdx, eIdx, rev := 0, 0, int64(0)
+	action, prevEvtType := registry.Put, mvccpb.PUT
+
+	for _, evt := range evts {
+		if prevEvtType != evt.Type {
+			if eIdx > 0 {
+				err := callback(action, rev, kvs[sIdx:eIdx], cb)
+				if err != nil {
+					return err
+				}
+				sIdx = eIdx
+			}
+			prevEvtType = evt.Type
+		}
+
+		if rev < evt.Kv.ModRevision {
+			rev = evt.Kv.ModRevision
+		}
+		action = setKvsAndConvertAction(kvs, eIdx, evt)
+
+		eIdx++
+	}
+
+	if eIdx > 0 {
+		return callback(action, rev, kvs[sIdx:eIdx], cb)
+	}
+	return nil
+}
+
+func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt mvccpb.Event) registry.ActionType {
 	switch evt.Type {
 	case mvccpb.DELETE:
 		kv := evt.PrevKv
@@ -493,15 +499,14 @@ func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *mvccpb.Event)
 	}
 }
 
-func setResponseAndCallback(pResp *registry.PluginResponse, kvs []*mvccpb.KeyValue, cb registry.WatchCallback) error {
-	pResp.Count = int64(len(kvs))
-	pResp.Kvs = kvs
-
-	err := cb("key information changed", pResp)
-	if err != nil {
-		return err
-	}
-	return nil
+func callback(action registry.ActionType, rev int64, kvs []*mvccpb.KeyValue, cb registry.WatchCallback) error {
+	return cb("key information changed", &registry.PluginResponse{
+		Action:    action,
+		Kvs:       kvs,
+		Count:     int64(len(kvs)),
+		Revision:  rev,
+		Succeeded: true,
+	})
 }
 
 func getEmbedInstance() mgr.PluginInstance {