You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2020/01/14 01:49:00 UTC

[servicecomb-kie] branch master updated: SCB-1718 support use revision param to query data (#69)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git


The following commit(s) were added to refs/heads/master by this push:
     new 545ebf7  SCB-1718 support use revision param to query data (#69)
545ebf7 is described below

commit 545ebf73ec3487a44c56b2dd17f626ccde064d55
Author: Shawn <xi...@gmail.com>
AuthorDate: Tue Jan 14 09:48:52 2020 +0800

    SCB-1718 support use revision param to query data (#69)
---
 examples/dev/db.js                     |   2 +-
 server/resource/v1/common.go           |  94 ++++++++++++-----------
 server/resource/v1/common_test.go      |   8 --
 server/resource/v1/doc_struct.go       |   9 +++
 server/resource/v1/kv_resource.go      | 135 ++++++++++++++++++---------------
 server/resource/v1/kv_resource_test.go |  75 +++++++++++++++++-
 6 files changed, 207 insertions(+), 116 deletions(-)

diff --git a/examples/dev/db.js b/examples/dev/db.js
index afd91c3..17f6cef 100644
--- a/examples/dev/db.js
+++ b/examples/dev/db.js
@@ -47,4 +47,4 @@ db.createCollection( "kv", {
         } }
 } );
 db.kv.createIndex({"id": 1}, { unique: true } );
-db.kv.createIndex({key: 1, label_id: 1},{ unique: true });
\ No newline at end of file
+db.kv.createIndex({key: 1, label_id: 1,domain:1,project:1},{ unique: true });
\ No newline at end of file
diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go
index d529e79..85e3e41 100644
--- a/server/resource/v1/common.go
+++ b/server/resource/v1/common.go
@@ -18,9 +18,9 @@
 package v1
 
 import (
+	"context"
 	"encoding/json"
 	"errors"
-	"fmt"
 	"github.com/apache/servicecomb-kie/server/pubsub"
 	"github.com/apache/servicecomb-kie/server/service"
 	uuid "github.com/satori/go.uuid"
@@ -30,8 +30,6 @@ import (
 	"time"
 
 	"github.com/apache/servicecomb-kie/pkg/common"
-	"github.com/apache/servicecomb-kie/pkg/model"
-
 	goRestful "github.com/emicklei/go-restful"
 	"github.com/go-chassis/go-chassis/server/restful"
 	"github.com/go-mesh/openlogging"
@@ -46,7 +44,14 @@ const (
 		"label can not be duplicated, please check query parameters"
 	MsgIllegalDepth     = "X-Depth must be number"
 	MsgInvalidWait      = "wait param should be formed with number and time unit like 5s,100ms, and less than 5m"
+	MsgInvalidRev       = "revision param should be formed with number greater than 0"
 	ErrKvIDMustNotEmpty = "must supply kv id if you want to remove key"
+
+	MaxWait = 5 * time.Minute
+)
+
+var (
+	ErrInvalidRev = errors.New(MsgInvalidRev)
 )
 
 //ReadDomain get domain info from attribute
@@ -103,12 +108,6 @@ func WriteErrResponse(context *restful.Context, status int, msg, contentType str
 
 }
 
-//InfoLog record info
-func InfoLog(action string, kv *model.KVDoc) {
-	openlogging.Info(
-		fmt.Sprintf("[%s] [%s] success", action, kv.Key))
-}
-
 func readRequest(ctx *restful.Context, v interface{}) error {
 	if ctx.ReadHeader(common.HeaderContentType) == common.ContentTypeYaml {
 		return yaml.NewDecoder(ctx.ReadRequest().Body).Decode(v)
@@ -132,11 +131,13 @@ func writeResponse(ctx *restful.Context, v interface{}) error {
 	}
 	return ctx.WriteJSON(v, goRestful.MIME_JSON) // json is default
 }
-
-//GetLabels parse labels
-func GetLabels(labelsSlice []string) (map[string]string, error) {
-	labels := make(map[string]string, len(labelsSlice))
-	for _, v := range labelsSlice {
+func getLabels(rctx *restful.Context) (map[string]string, error) {
+	labelSlice := rctx.Req.QueryParameters("label")
+	if len(labelSlice) == 0 {
+		return nil, nil
+	}
+	labels := make(map[string]string, len(labelSlice))
+	for _, v := range labelSlice {
 		v := strings.Split(v, ":")
 		if len(v) != 2 {
 			return nil, errors.New(MsgIllegalLabels)
@@ -145,45 +146,46 @@ func GetLabels(labelsSlice []string) (map[string]string, error) {
 	}
 	return labels, nil
 }
-func getWaitDuration(rctx *restful.Context) string {
-	wait := rctx.ReadQueryParameter(common.QueryParamWait)
-	if wait == "" {
-		wait = "0s"
+func isRevised(ctx context.Context, revStr string) (bool, error) {
+	rev, err := strconv.ParseInt(revStr, 10, 64)
+	if err != nil {
+		return false, ErrInvalidRev
 	}
-	return wait
-}
-func getRevision(rctx *restful.Context) string {
-	rev := rctx.ReadQueryParameter(common.QueryParamRev)
-	if rev == "" {
-		rev = "0"
+	latest, err := service.RevisionService.GetRevision(ctx)
+	if err != nil {
+		return false, err
+	}
+	if latest > rev {
+		return true, nil
 	}
-	return rev
+	return false, nil
 }
 func getMatchPattern(rctx *restful.Context) string {
 	m := rctx.ReadQueryParameter(common.QueryParamMatch)
-
 	if m != "" && m != PatternExact {
 		return ""
 	}
 	return m
 }
-func wait(d time.Duration, rctx *restful.Context, topic *pubsub.Topic) bool {
-	changed := true
-	if d != 0 {
-		o := &pubsub.Observer{
-			UUID:      uuid.NewV4().String(),
-			RemoteIP:  rctx.ReadRequest().RemoteAddr, //TODO x forward ip
-			UserAgent: rctx.ReadHeader("User-Agent"),
-			Event:     make(chan *pubsub.KVChangeEvent, 1),
-		}
-		pubsub.ObserveOnce(o, topic)
-		select {
-		case <-time.After(d):
-			changed = false
-		case <-o.Event:
-		}
-	}
-	return changed
+func eventHappened(rctx *restful.Context, waitStr string, topic *pubsub.Topic) (bool, error) {
+	d, err := time.ParseDuration(waitStr)
+	if err != nil || d > MaxWait {
+		return false, errors.New(MsgInvalidWait)
+	}
+	happened := true
+	o := &pubsub.Observer{
+		UUID:      uuid.NewV4().String(),
+		RemoteIP:  rctx.ReadRequest().RemoteAddr, //TODO x forward ip
+		UserAgent: rctx.ReadHeader("User-Agent"),
+		Event:     make(chan *pubsub.KVChangeEvent, 1),
+	}
+	pubsub.ObserveOnce(o, topic)
+	select {
+	case <-time.After(d):
+		happened = false
+	case <-o.Event:
+	}
+	return happened, nil
 }
 func checkPagination(limitStr, offsetStr string) (int64, int64, error) {
 	var err error
@@ -226,6 +228,12 @@ func queryAndResponse(rctx *restful.Context,
 		WriteErrResponse(rctx, http.StatusInternalServerError, err.Error(), common.ContentTypeText)
 		return
 	}
+	rev, err := service.RevisionService.GetRevision(rctx.Ctx)
+	if err != nil {
+		WriteErrResponse(rctx, http.StatusInternalServerError, err.Error(), common.ContentTypeText)
+		return
+	}
+	rctx.ReadResponseWriter().Header().Set(common.HeaderRevision, strconv.FormatInt(rev, 10))
 	err = writeResponse(rctx, kv)
 	if err != nil {
 		openlogging.Error(err.Error())
diff --git a/server/resource/v1/common_test.go b/server/resource/v1/common_test.go
index d1ba7a7..e57b805 100644
--- a/server/resource/v1/common_test.go
+++ b/server/resource/v1/common_test.go
@@ -42,12 +42,4 @@ func TestGetLabels(t *testing.T) {
 	assert.NoError(t, err)
 	assert.Equal(t, 1, len(c))
 
-	r, err = http.NewRequest("GET",
-		"/kv?label=app:mall&label=service:payment",
-		nil)
-	assert.NoError(t, err)
-	req := restful.NewRequest(r)
-	m, err := v1.GetLabels(req.QueryParameters("label"))
-	assert.NoError(t, err)
-	assert.Equal(t, 2, len(m))
 }
diff --git a/server/resource/v1/doc_struct.go b/server/resource/v1/doc_struct.go
index 5ad7a4e..47635ce 100644
--- a/server/resource/v1/doc_struct.go
+++ b/server/resource/v1/doc_struct.go
@@ -59,6 +59,15 @@ var (
 			"for example wait=5s, server will not response until 5 seconds during that time window, " +
 			"if any kv changed, server will return 200 and kv list, otherwise return 304 and empty body",
 	}
+	DocQueryRev = &restful.Parameters{
+		DataType:  "string",
+		Name:      common.QueryParamRev,
+		ParamType: goRestful.QueryParameterKind,
+		Required:  false,
+		Desc: "each time you query,server will return a number in header X-Kie-Revision. " +
+			"you can record it in client side, use this number as param value. " +
+			"if current revision is greater than it, server will return data",
+	}
 	DocQueryMatch = &restful.Parameters{
 		DataType:  "string",
 		Name:      common.QueryParamMatch,
diff --git a/server/resource/v1/kv_resource.go b/server/resource/v1/kv_resource.go
index f5335b1..02db656 100644
--- a/server/resource/v1/kv_resource.go
+++ b/server/resource/v1/kv_resource.go
@@ -20,16 +20,14 @@ package v1
 
 import (
 	"fmt"
-	"github.com/apache/servicecomb-kie/server/pubsub"
-	"net/http"
-	"time"
-
 	"github.com/apache/servicecomb-kie/pkg/common"
 	"github.com/apache/servicecomb-kie/pkg/model"
+	"github.com/apache/servicecomb-kie/server/pubsub"
 	"github.com/apache/servicecomb-kie/server/service"
 	goRestful "github.com/emicklei/go-restful"
 	"github.com/go-chassis/go-chassis/server/restful"
 	"github.com/go-mesh/openlogging"
+	"net/http"
 )
 
 //KVResource has API about kv operations
@@ -49,6 +47,7 @@ func (r *KVResource) Put(context *restful.Context) {
 	domain := ReadDomain(context)
 	if domain == nil {
 		WriteErrResponse(context, http.StatusInternalServerError, MsgDomainMustNotBeEmpty, common.ContentTypeText)
+		return
 	}
 	kv.Key = key
 	kv.Domain = domain.(string)
@@ -69,7 +68,8 @@ func (r *KVResource) Put(context *restful.Context) {
 	if err != nil {
 		openlogging.Warn("lost kv change event:" + err.Error())
 	}
-	InfoLog("put", kv)
+	openlogging.Info(
+		fmt.Sprintf("put [%s] success", kv.Key))
 	err = writeResponse(context, kv)
 	if err != nil {
 		openlogging.Error(err.Error())
@@ -86,41 +86,17 @@ func (r *KVResource) GetByKey(rctx *restful.Context) {
 		return
 	}
 	project := rctx.ReadPathParameter("project")
-	labelSlice := rctx.Req.QueryParameters("label")
-	var labels map[string]string
-	if len(labelSlice) != 0 {
-		labels, err = GetLabels(labelSlice)
-		if err != nil {
-			WriteErrResponse(rctx, http.StatusBadRequest, MsgIllegalLabels, common.ContentTypeText)
-			return
-		}
+	labels, err := getLabels(rctx)
+	if err != nil {
+		WriteErrResponse(rctx, http.StatusBadRequest, MsgIllegalLabels, common.ContentTypeText)
+		return
 	}
 	domain := ReadDomain(rctx)
 	if domain == nil {
 		WriteErrResponse(rctx, http.StatusInternalServerError, MsgDomainMustNotBeEmpty, common.ContentTypeText)
 		return
 	}
-	waitStr := getWaitDuration(rctx)
-	d, err := time.ParseDuration(waitStr)
-	if err != nil || d > 5*time.Minute {
-		WriteErrResponse(rctx, http.StatusBadRequest, MsgInvalidWait, common.ContentTypeText)
-		return
-	}
-	if d == 0 {
-		queryAndResponse(rctx, domain, project, key, labels, 0, 0)
-		return
-	}
-	changed := wait(d, rctx, &pubsub.Topic{
-		Key:      key,
-		Labels:   labels,
-		Project:  project,
-		DomainID: domain.(string),
-	})
-	if changed {
-		queryAndResponse(rctx, domain, project, key, labels, 0, 0)
-		return
-	}
-	rctx.WriteHeader(http.StatusNotModified)
+	returnData(rctx, domain, project, labels, 0, 0)
 }
 
 //List TODO pagination
@@ -132,14 +108,10 @@ func (r *KVResource) List(rctx *restful.Context) {
 		WriteErrResponse(rctx, http.StatusInternalServerError, MsgDomainMustNotBeEmpty, common.ContentTypeText)
 		return
 	}
-	labelSlice := rctx.Req.QueryParameters("label")
-	var labels map[string]string
-	if len(labelSlice) != 0 {
-		labels, err = GetLabels(labelSlice)
-		if err != nil {
-			WriteErrResponse(rctx, http.StatusBadRequest, MsgIllegalLabels, common.ContentTypeText)
-			return
-		}
+	labels, err := getLabels(rctx)
+	if err != nil {
+		WriteErrResponse(rctx, http.StatusBadRequest, err.Error(), common.ContentTypeText)
+		return
 	}
 	limitStr := rctx.ReadPathParameter("limit")
 	offsetStr := rctx.ReadPathParameter("offset")
@@ -148,27 +120,64 @@ func (r *KVResource) List(rctx *restful.Context) {
 		WriteErrResponse(rctx, http.StatusBadRequest, err.Error(), common.ContentTypeText)
 		return
 	}
-	waitStr := getWaitDuration(rctx)
-	d, err := time.ParseDuration(waitStr)
-	if err != nil || d > 5*time.Minute {
-		WriteErrResponse(rctx, http.StatusBadRequest, MsgInvalidWait, common.ContentTypeText)
-		return
-	}
-	if d == 0 {
-		queryAndResponse(rctx, domain, project, "", labels, int(limit), int(offset))
-		return
-	}
-	changed := wait(d, rctx, &pubsub.Topic{
-		Labels:   labels,
-		Project:  project,
-		DomainID: domain.(string),
-	})
-	if changed {
-		queryAndResponse(rctx, domain, project, "", labels, int(limit), int(offset))
-		return
-	}
-	rctx.WriteHeader(http.StatusNotModified)
+	returnData(rctx, domain, project, labels, limit, offset)
+}
 
+func returnData(rctx *restful.Context, domain interface{}, project string, labels map[string]string, limit int64, offset int64) {
+	revStr := rctx.ReadQueryParameter(common.QueryParamRev)
+	wait := rctx.ReadQueryParameter(common.QueryParamWait)
+	if revStr == "" {
+		if wait == "" {
+			queryAndResponse(rctx, domain, project, "", labels, int(limit), int(offset))
+			return
+		}
+		changed, err := eventHappened(rctx, wait, &pubsub.Topic{
+			Labels:   labels,
+			Project:  project,
+			DomainID: domain.(string),
+		})
+		if err != nil {
+			WriteErrResponse(rctx, http.StatusBadRequest, err.Error(), common.ContentTypeText)
+			return
+		}
+		if changed {
+			queryAndResponse(rctx, domain, project, "", labels, int(limit), int(offset))
+			return
+		}
+		rctx.WriteHeader(http.StatusNotModified)
+	} else {
+		revised, err := isRevised(rctx.Ctx, revStr)
+		if err != nil {
+			if err == ErrInvalidRev {
+				WriteErrResponse(rctx, http.StatusBadRequest, err.Error(), common.ContentTypeText)
+				return
+			}
+			WriteErrResponse(rctx, http.StatusInternalServerError, err.Error(), common.ContentTypeText)
+			return
+		}
+		if revised {
+			queryAndResponse(rctx, domain, project, "", labels, int(limit), int(offset))
+			return
+		} else if wait != "" {
+			changed, err := eventHappened(rctx, wait, &pubsub.Topic{
+				Labels:   labels,
+				Project:  project,
+				DomainID: domain.(string),
+			})
+			if err != nil {
+				WriteErrResponse(rctx, http.StatusBadRequest, err.Error(), common.ContentTypeText)
+				return
+			}
+			if changed {
+				queryAndResponse(rctx, domain, project, "", labels, int(limit), int(offset))
+				return
+			}
+			rctx.WriteHeader(http.StatusNotModified)
+			return
+		} else {
+			rctx.WriteHeader(http.StatusNotModified)
+		}
+	}
 }
 
 //Search search key only by label
@@ -279,7 +288,7 @@ func (r *KVResource) URLPatterns() []restful.Route {
 			ResourceFunc: r.GetByKey,
 			FuncDesc:     "get key values by key and labels",
 			Parameters: []*restful.Parameters{
-				DocPathProject, DocPathKey, DocQueryLabelParameters, DocQueryMatch,
+				DocPathProject, DocPathKey, DocQueryLabelParameters, DocQueryMatch, DocQueryRev,
 			},
 			Returns: []*restful.Returns{
 				{
diff --git a/server/resource/v1/kv_resource_test.go b/server/resource/v1/kv_resource_test.go
index 85ed987..4354a3a 100644
--- a/server/resource/v1/kv_resource_test.go
+++ b/server/resource/v1/kv_resource_test.go
@@ -20,6 +20,7 @@ package v1_test
 import (
 	"bytes"
 	"encoding/json"
+	common2 "github.com/apache/servicecomb-kie/pkg/common"
 	"github.com/apache/servicecomb-kie/pkg/model"
 	"github.com/apache/servicecomb-kie/server/config"
 	handler2 "github.com/apache/servicecomb-kie/server/handler"
@@ -36,6 +37,7 @@ import (
 	"net/http"
 	"net/http/httptest"
 	"testing"
+	"time"
 
 	_ "github.com/apache/servicecomb-kie/server/service/mongo"
 )
@@ -129,6 +131,7 @@ func TestKVResource_List(t *testing.T) {
 		assert.NoError(t, err)
 		assert.Equal(t, 2, len(result.Data))
 	})
+	var rev string
 	t.Run("list kv by service label, exact match,should return 1 kv", func(t *testing.T) {
 		r, _ := http.NewRequest("GET", "/v1/test/kie/kv?label=service:utService&match=exact", nil)
 		noopH := &handler2.NoopAuthHandler{}
@@ -145,8 +148,78 @@ func TestKVResource_List(t *testing.T) {
 		err = json.Unmarshal(body, result)
 		assert.NoError(t, err)
 		assert.Equal(t, 1, len(result.Data))
+		rev = resp.Header().Get(common2.HeaderRevision)
+	})
+	t.Run("list kv by service label, with current rev param,should return 304 ", func(t *testing.T) {
+		r, _ := http.NewRequest("GET", "/v1/test/kie/kv?label=service:utService&"+common2.QueryParamRev+"="+rev, nil)
+		noopH := &handler2.NoopAuthHandler{}
+		chain, _ := handler.CreateChain(common.Provider, "testchain1", noopH.Name())
+		r.Header.Set("Content-Type", "application/json")
+		kvr := &v1.KVResource{}
+		c, err := restfultest.New(kvr, chain)
+		assert.NoError(t, err)
+		resp := httptest.NewRecorder()
+		c.ServeHTTP(resp, r)
+		assert.NoError(t, err)
+		assert.Equal(t, http.StatusNotModified, resp.Result().StatusCode)
+	})
+	t.Run("list kv by service label, with old rev param,should return latest revision", func(t *testing.T) {
+		r, _ := http.NewRequest("GET", "/v1/test/kie/kv?label=service:utService&"+common2.QueryParamRev+"=1", nil)
+		noopH := &handler2.NoopAuthHandler{}
+		chain, _ := handler.CreateChain(common.Provider, "testchain1", noopH.Name())
+		r.Header.Set("Content-Type", "application/json")
+		kvr := &v1.KVResource{}
+		c, err := restfultest.New(kvr, chain)
+		assert.NoError(t, err)
+		resp := httptest.NewRecorder()
+		c.ServeHTTP(resp, r)
+		assert.NoError(t, err)
+		assert.Equal(t, http.StatusOK, resp.Result().StatusCode)
+	})
+	t.Run("list kv by service label, with wait and old rev param,should return latest revision,no wait", func(t *testing.T) {
+		r, _ := http.NewRequest("GET", "/v1/test/kie/kv?label=service:utService&wait=1s&"+common2.QueryParamRev+"=1", nil)
+		noopH := &handler2.NoopAuthHandler{}
+		chain, _ := handler.CreateChain(common.Provider, "testchain1", noopH.Name())
+		r.Header.Set("Content-Type", "application/json")
+		kvr := &v1.KVResource{}
+		c, err := restfultest.New(kvr, chain)
+		assert.NoError(t, err)
+		resp := httptest.NewRecorder()
+		start := time.Now()
+		c.ServeHTTP(resp, r)
+		duration := time.Since(start)
+		t.Log(duration)
+		assert.Equal(t, http.StatusOK, resp.Result().StatusCode)
+	})
+	t.Run("list kv by service label, with wait and current rev param,should wait and return 304 ", func(t *testing.T) {
+		r, _ := http.NewRequest("GET", "/v1/test/kie/kv?label=service:utService&wait=1s&"+common2.QueryParamRev+"="+rev, nil)
+		noopH := &handler2.NoopAuthHandler{}
+		chain, _ := handler.CreateChain(common.Provider, "testchain1", noopH.Name())
+		r.Header.Set("Content-Type", "application/json")
+		kvr := &v1.KVResource{}
+		c, err := restfultest.New(kvr, chain)
+		assert.NoError(t, err)
+		resp := httptest.NewRecorder()
+		start := time.Now()
+		c.ServeHTTP(resp, r)
+		duration := time.Since(start)
+		t.Log(duration)
+		assert.Equal(t, http.StatusNotModified, resp.Result().StatusCode)
+	})
+	t.Run("list kv by service label, with wait param,will too 1s and return 304", func(t *testing.T) {
+		r, _ := http.NewRequest("GET", "/v1/test/kie/kv?label=service:utService&wait=1s", nil)
+		noopH := &handler2.NoopAuthHandler{}
+		chain, _ := handler.CreateChain(common.Provider, "testchain1", noopH.Name())
+		r.Header.Set("Content-Type", "application/json")
+		kvr := &v1.KVResource{}
+		c, err := restfultest.New(kvr, chain)
+		assert.NoError(t, err)
+		resp := httptest.NewRecorder()
+		start := time.Now()
+		c.ServeHTTP(resp, r)
+		duration := time.Since(start)
+		t.Log(duration)
 	})
-
 }
 func TestKVResource_GetByKey(t *testing.T) {
 	t.Run("get one key by label, exact match,should return 1 kv", func(t *testing.T) {