You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2020/01/14 01:49:00 UTC
[servicecomb-kie] branch master updated: SCB-1718 support use
revision param to query data (#69)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
The following commit(s) were added to refs/heads/master by this push:
new 545ebf7 SCB-1718 support use revision param to query data (#69)
545ebf7 is described below
commit 545ebf73ec3487a44c56b2dd17f626ccde064d55
Author: Shawn <xi...@gmail.com>
AuthorDate: Tue Jan 14 09:48:52 2020 +0800
SCB-1718 support use revision param to query data (#69)
---
examples/dev/db.js | 2 +-
server/resource/v1/common.go | 94 ++++++++++++-----------
server/resource/v1/common_test.go | 8 --
server/resource/v1/doc_struct.go | 9 +++
server/resource/v1/kv_resource.go | 135 ++++++++++++++++++---------------
server/resource/v1/kv_resource_test.go | 75 +++++++++++++++++-
6 files changed, 207 insertions(+), 116 deletions(-)
diff --git a/examples/dev/db.js b/examples/dev/db.js
index afd91c3..17f6cef 100644
--- a/examples/dev/db.js
+++ b/examples/dev/db.js
@@ -47,4 +47,4 @@ db.createCollection( "kv", {
} }
} );
db.kv.createIndex({"id": 1}, { unique: true } );
-db.kv.createIndex({key: 1, label_id: 1},{ unique: true });
\ No newline at end of file
+db.kv.createIndex({key: 1, label_id: 1,domain:1,project:1},{ unique: true });
\ No newline at end of file
diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go
index d529e79..85e3e41 100644
--- a/server/resource/v1/common.go
+++ b/server/resource/v1/common.go
@@ -18,9 +18,9 @@
package v1
import (
+ "context"
"encoding/json"
"errors"
- "fmt"
"github.com/apache/servicecomb-kie/server/pubsub"
"github.com/apache/servicecomb-kie/server/service"
uuid "github.com/satori/go.uuid"
@@ -30,8 +30,6 @@ import (
"time"
"github.com/apache/servicecomb-kie/pkg/common"
- "github.com/apache/servicecomb-kie/pkg/model"
-
goRestful "github.com/emicklei/go-restful"
"github.com/go-chassis/go-chassis/server/restful"
"github.com/go-mesh/openlogging"
@@ -46,7 +44,14 @@ const (
"label can not be duplicated, please check query parameters"
MsgIllegalDepth = "X-Depth must be number"
MsgInvalidWait = "wait param should be formed with number and time unit like 5s,100ms, and less than 5m"
+ MsgInvalidRev = "revision param should be formed with number greater than 0"
ErrKvIDMustNotEmpty = "must supply kv id if you want to remove key"
+
+ MaxWait = 5 * time.Minute
+)
+
+var (
+ ErrInvalidRev = errors.New(MsgInvalidRev)
)
//ReadDomain get domain info from attribute
@@ -103,12 +108,6 @@ func WriteErrResponse(context *restful.Context, status int, msg, contentType str
}
-//InfoLog record info
-func InfoLog(action string, kv *model.KVDoc) {
- openlogging.Info(
- fmt.Sprintf("[%s] [%s] success", action, kv.Key))
-}
-
func readRequest(ctx *restful.Context, v interface{}) error {
if ctx.ReadHeader(common.HeaderContentType) == common.ContentTypeYaml {
return yaml.NewDecoder(ctx.ReadRequest().Body).Decode(v)
@@ -132,11 +131,13 @@ func writeResponse(ctx *restful.Context, v interface{}) error {
}
return ctx.WriteJSON(v, goRestful.MIME_JSON) // json is default
}
-
-//GetLabels parse labels
-func GetLabels(labelsSlice []string) (map[string]string, error) {
- labels := make(map[string]string, len(labelsSlice))
- for _, v := range labelsSlice {
+func getLabels(rctx *restful.Context) (map[string]string, error) {
+ labelSlice := rctx.Req.QueryParameters("label")
+ if len(labelSlice) == 0 {
+ return nil, nil
+ }
+ labels := make(map[string]string, len(labelSlice))
+ for _, v := range labelSlice {
v := strings.Split(v, ":")
if len(v) != 2 {
return nil, errors.New(MsgIllegalLabels)
@@ -145,45 +146,46 @@ func GetLabels(labelsSlice []string) (map[string]string, error) {
}
return labels, nil
}
-func getWaitDuration(rctx *restful.Context) string {
- wait := rctx.ReadQueryParameter(common.QueryParamWait)
- if wait == "" {
- wait = "0s"
+func isRevised(ctx context.Context, revStr string) (bool, error) {
+ rev, err := strconv.ParseInt(revStr, 10, 64)
+ if err != nil {
+ return false, ErrInvalidRev
}
- return wait
-}
-func getRevision(rctx *restful.Context) string {
- rev := rctx.ReadQueryParameter(common.QueryParamRev)
- if rev == "" {
- rev = "0"
+ latest, err := service.RevisionService.GetRevision(ctx)
+ if err != nil {
+ return false, err
+ }
+ if latest > rev {
+ return true, nil
}
- return rev
+ return false, nil
}
func getMatchPattern(rctx *restful.Context) string {
m := rctx.ReadQueryParameter(common.QueryParamMatch)
-
if m != "" && m != PatternExact {
return ""
}
return m
}
-func wait(d time.Duration, rctx *restful.Context, topic *pubsub.Topic) bool {
- changed := true
- if d != 0 {
- o := &pubsub.Observer{
- UUID: uuid.NewV4().String(),
- RemoteIP: rctx.ReadRequest().RemoteAddr, //TODO x forward ip
- UserAgent: rctx.ReadHeader("User-Agent"),
- Event: make(chan *pubsub.KVChangeEvent, 1),
- }
- pubsub.ObserveOnce(o, topic)
- select {
- case <-time.After(d):
- changed = false
- case <-o.Event:
- }
- }
- return changed
+func eventHappened(rctx *restful.Context, waitStr string, topic *pubsub.Topic) (bool, error) {
+ d, err := time.ParseDuration(waitStr)
+ if err != nil || d > MaxWait {
+ return false, errors.New(MsgInvalidWait)
+ }
+ happened := true
+ o := &pubsub.Observer{
+ UUID: uuid.NewV4().String(),
+ RemoteIP: rctx.ReadRequest().RemoteAddr, //TODO x forward ip
+ UserAgent: rctx.ReadHeader("User-Agent"),
+ Event: make(chan *pubsub.KVChangeEvent, 1),
+ }
+ pubsub.ObserveOnce(o, topic)
+ select {
+ case <-time.After(d):
+ happened = false
+ case <-o.Event:
+ }
+ return happened, nil
}
func checkPagination(limitStr, offsetStr string) (int64, int64, error) {
var err error
@@ -226,6 +228,12 @@ func queryAndResponse(rctx *restful.Context,
WriteErrResponse(rctx, http.StatusInternalServerError, err.Error(), common.ContentTypeText)
return
}
+ rev, err := service.RevisionService.GetRevision(rctx.Ctx)
+ if err != nil {
+ WriteErrResponse(rctx, http.StatusInternalServerError, err.Error(), common.ContentTypeText)
+ return
+ }
+ rctx.ReadResponseWriter().Header().Set(common.HeaderRevision, strconv.FormatInt(rev, 10))
err = writeResponse(rctx, kv)
if err != nil {
openlogging.Error(err.Error())
diff --git a/server/resource/v1/common_test.go b/server/resource/v1/common_test.go
index d1ba7a7..e57b805 100644
--- a/server/resource/v1/common_test.go
+++ b/server/resource/v1/common_test.go
@@ -42,12 +42,4 @@ func TestGetLabels(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(c))
- r, err = http.NewRequest("GET",
- "/kv?label=app:mall&label=service:payment",
- nil)
- assert.NoError(t, err)
- req := restful.NewRequest(r)
- m, err := v1.GetLabels(req.QueryParameters("label"))
- assert.NoError(t, err)
- assert.Equal(t, 2, len(m))
}
diff --git a/server/resource/v1/doc_struct.go b/server/resource/v1/doc_struct.go
index 5ad7a4e..47635ce 100644
--- a/server/resource/v1/doc_struct.go
+++ b/server/resource/v1/doc_struct.go
@@ -59,6 +59,15 @@ var (
"for example wait=5s, server will not response until 5 seconds during that time window, " +
"if any kv changed, server will return 200 and kv list, otherwise return 304 and empty body",
}
+ DocQueryRev = &restful.Parameters{
+ DataType: "string",
+ Name: common.QueryParamRev,
+ ParamType: goRestful.QueryParameterKind,
+ Required: false,
+ Desc: "each time you query,server will return a number in header X-Kie-Revision. " +
+ "you can record it in client side, use this number as param value. " +
+ "if current revision is greater than it, server will return data",
+ }
DocQueryMatch = &restful.Parameters{
DataType: "string",
Name: common.QueryParamMatch,
diff --git a/server/resource/v1/kv_resource.go b/server/resource/v1/kv_resource.go
index f5335b1..02db656 100644
--- a/server/resource/v1/kv_resource.go
+++ b/server/resource/v1/kv_resource.go
@@ -20,16 +20,14 @@ package v1
import (
"fmt"
- "github.com/apache/servicecomb-kie/server/pubsub"
- "net/http"
- "time"
-
"github.com/apache/servicecomb-kie/pkg/common"
"github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/server/pubsub"
"github.com/apache/servicecomb-kie/server/service"
goRestful "github.com/emicklei/go-restful"
"github.com/go-chassis/go-chassis/server/restful"
"github.com/go-mesh/openlogging"
+ "net/http"
)
//KVResource has API about kv operations
@@ -49,6 +47,7 @@ func (r *KVResource) Put(context *restful.Context) {
domain := ReadDomain(context)
if domain == nil {
WriteErrResponse(context, http.StatusInternalServerError, MsgDomainMustNotBeEmpty, common.ContentTypeText)
+ return
}
kv.Key = key
kv.Domain = domain.(string)
@@ -69,7 +68,8 @@ func (r *KVResource) Put(context *restful.Context) {
if err != nil {
openlogging.Warn("lost kv change event:" + err.Error())
}
- InfoLog("put", kv)
+ openlogging.Info(
+ fmt.Sprintf("put [%s] success", kv.Key))
err = writeResponse(context, kv)
if err != nil {
openlogging.Error(err.Error())
@@ -86,41 +86,17 @@ func (r *KVResource) GetByKey(rctx *restful.Context) {
return
}
project := rctx.ReadPathParameter("project")
- labelSlice := rctx.Req.QueryParameters("label")
- var labels map[string]string
- if len(labelSlice) != 0 {
- labels, err = GetLabels(labelSlice)
- if err != nil {
- WriteErrResponse(rctx, http.StatusBadRequest, MsgIllegalLabels, common.ContentTypeText)
- return
- }
+ labels, err := getLabels(rctx)
+ if err != nil {
+ WriteErrResponse(rctx, http.StatusBadRequest, MsgIllegalLabels, common.ContentTypeText)
+ return
}
domain := ReadDomain(rctx)
if domain == nil {
WriteErrResponse(rctx, http.StatusInternalServerError, MsgDomainMustNotBeEmpty, common.ContentTypeText)
return
}
- waitStr := getWaitDuration(rctx)
- d, err := time.ParseDuration(waitStr)
- if err != nil || d > 5*time.Minute {
- WriteErrResponse(rctx, http.StatusBadRequest, MsgInvalidWait, common.ContentTypeText)
- return
- }
- if d == 0 {
- queryAndResponse(rctx, domain, project, key, labels, 0, 0)
- return
- }
- changed := wait(d, rctx, &pubsub.Topic{
- Key: key,
- Labels: labels,
- Project: project,
- DomainID: domain.(string),
- })
- if changed {
- queryAndResponse(rctx, domain, project, key, labels, 0, 0)
- return
- }
- rctx.WriteHeader(http.StatusNotModified)
+ returnData(rctx, domain, project, labels, 0, 0)
}
//List TODO pagination
@@ -132,14 +108,10 @@ func (r *KVResource) List(rctx *restful.Context) {
WriteErrResponse(rctx, http.StatusInternalServerError, MsgDomainMustNotBeEmpty, common.ContentTypeText)
return
}
- labelSlice := rctx.Req.QueryParameters("label")
- var labels map[string]string
- if len(labelSlice) != 0 {
- labels, err = GetLabels(labelSlice)
- if err != nil {
- WriteErrResponse(rctx, http.StatusBadRequest, MsgIllegalLabels, common.ContentTypeText)
- return
- }
+ labels, err := getLabels(rctx)
+ if err != nil {
+ WriteErrResponse(rctx, http.StatusBadRequest, err.Error(), common.ContentTypeText)
+ return
}
limitStr := rctx.ReadPathParameter("limit")
offsetStr := rctx.ReadPathParameter("offset")
@@ -148,27 +120,64 @@ func (r *KVResource) List(rctx *restful.Context) {
WriteErrResponse(rctx, http.StatusBadRequest, err.Error(), common.ContentTypeText)
return
}
- waitStr := getWaitDuration(rctx)
- d, err := time.ParseDuration(waitStr)
- if err != nil || d > 5*time.Minute {
- WriteErrResponse(rctx, http.StatusBadRequest, MsgInvalidWait, common.ContentTypeText)
- return
- }
- if d == 0 {
- queryAndResponse(rctx, domain, project, "", labels, int(limit), int(offset))
- return
- }
- changed := wait(d, rctx, &pubsub.Topic{
- Labels: labels,
- Project: project,
- DomainID: domain.(string),
- })
- if changed {
- queryAndResponse(rctx, domain, project, "", labels, int(limit), int(offset))
- return
- }
- rctx.WriteHeader(http.StatusNotModified)
+ returnData(rctx, domain, project, labels, limit, offset)
+}
+func returnData(rctx *restful.Context, domain interface{}, project string, labels map[string]string, limit int64, offset int64) {
+ revStr := rctx.ReadQueryParameter(common.QueryParamRev)
+ wait := rctx.ReadQueryParameter(common.QueryParamWait)
+ if revStr == "" {
+ if wait == "" {
+ queryAndResponse(rctx, domain, project, "", labels, int(limit), int(offset))
+ return
+ }
+ changed, err := eventHappened(rctx, wait, &pubsub.Topic{
+ Labels: labels,
+ Project: project,
+ DomainID: domain.(string),
+ })
+ if err != nil {
+ WriteErrResponse(rctx, http.StatusBadRequest, err.Error(), common.ContentTypeText)
+ return
+ }
+ if changed {
+ queryAndResponse(rctx, domain, project, "", labels, int(limit), int(offset))
+ return
+ }
+ rctx.WriteHeader(http.StatusNotModified)
+ } else {
+ revised, err := isRevised(rctx.Ctx, revStr)
+ if err != nil {
+ if err == ErrInvalidRev {
+ WriteErrResponse(rctx, http.StatusBadRequest, err.Error(), common.ContentTypeText)
+ return
+ }
+ WriteErrResponse(rctx, http.StatusInternalServerError, err.Error(), common.ContentTypeText)
+ return
+ }
+ if revised {
+ queryAndResponse(rctx, domain, project, "", labels, int(limit), int(offset))
+ return
+ } else if wait != "" {
+ changed, err := eventHappened(rctx, wait, &pubsub.Topic{
+ Labels: labels,
+ Project: project,
+ DomainID: domain.(string),
+ })
+ if err != nil {
+ WriteErrResponse(rctx, http.StatusBadRequest, err.Error(), common.ContentTypeText)
+ return
+ }
+ if changed {
+ queryAndResponse(rctx, domain, project, "", labels, int(limit), int(offset))
+ return
+ }
+ rctx.WriteHeader(http.StatusNotModified)
+ return
+ } else {
+ rctx.WriteHeader(http.StatusNotModified)
+ }
+ }
}
//Search search key only by label
@@ -279,7 +288,7 @@ func (r *KVResource) URLPatterns() []restful.Route {
ResourceFunc: r.GetByKey,
FuncDesc: "get key values by key and labels",
Parameters: []*restful.Parameters{
- DocPathProject, DocPathKey, DocQueryLabelParameters, DocQueryMatch,
+ DocPathProject, DocPathKey, DocQueryLabelParameters, DocQueryMatch, DocQueryRev,
},
Returns: []*restful.Returns{
{
diff --git a/server/resource/v1/kv_resource_test.go b/server/resource/v1/kv_resource_test.go
index 85ed987..4354a3a 100644
--- a/server/resource/v1/kv_resource_test.go
+++ b/server/resource/v1/kv_resource_test.go
@@ -20,6 +20,7 @@ package v1_test
import (
"bytes"
"encoding/json"
+ common2 "github.com/apache/servicecomb-kie/pkg/common"
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/config"
handler2 "github.com/apache/servicecomb-kie/server/handler"
@@ -36,6 +37,7 @@ import (
"net/http"
"net/http/httptest"
"testing"
+ "time"
_ "github.com/apache/servicecomb-kie/server/service/mongo"
)
@@ -129,6 +131,7 @@ func TestKVResource_List(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 2, len(result.Data))
})
+ var rev string
t.Run("list kv by service label, exact match,should return 1 kv", func(t *testing.T) {
r, _ := http.NewRequest("GET", "/v1/test/kie/kv?label=service:utService&match=exact", nil)
noopH := &handler2.NoopAuthHandler{}
@@ -145,8 +148,78 @@ func TestKVResource_List(t *testing.T) {
err = json.Unmarshal(body, result)
assert.NoError(t, err)
assert.Equal(t, 1, len(result.Data))
+ rev = resp.Header().Get(common2.HeaderRevision)
+ })
+ t.Run("list kv by service label, with current rev param,should return 304 ", func(t *testing.T) {
+ r, _ := http.NewRequest("GET", "/v1/test/kie/kv?label=service:utService&"+common2.QueryParamRev+"="+rev, nil)
+ noopH := &handler2.NoopAuthHandler{}
+ chain, _ := handler.CreateChain(common.Provider, "testchain1", noopH.Name())
+ r.Header.Set("Content-Type", "application/json")
+ kvr := &v1.KVResource{}
+ c, err := restfultest.New(kvr, chain)
+ assert.NoError(t, err)
+ resp := httptest.NewRecorder()
+ c.ServeHTTP(resp, r)
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusNotModified, resp.Result().StatusCode)
+ })
+ t.Run("list kv by service label, with old rev param,should return latest revision", func(t *testing.T) {
+ r, _ := http.NewRequest("GET", "/v1/test/kie/kv?label=service:utService&"+common2.QueryParamRev+"=1", nil)
+ noopH := &handler2.NoopAuthHandler{}
+ chain, _ := handler.CreateChain(common.Provider, "testchain1", noopH.Name())
+ r.Header.Set("Content-Type", "application/json")
+ kvr := &v1.KVResource{}
+ c, err := restfultest.New(kvr, chain)
+ assert.NoError(t, err)
+ resp := httptest.NewRecorder()
+ c.ServeHTTP(resp, r)
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusOK, resp.Result().StatusCode)
+ })
+ t.Run("list kv by service label, with wait and old rev param,should return latest revision,no wait", func(t *testing.T) {
+ r, _ := http.NewRequest("GET", "/v1/test/kie/kv?label=service:utService&wait=1s&"+common2.QueryParamRev+"=1", nil)
+ noopH := &handler2.NoopAuthHandler{}
+ chain, _ := handler.CreateChain(common.Provider, "testchain1", noopH.Name())
+ r.Header.Set("Content-Type", "application/json")
+ kvr := &v1.KVResource{}
+ c, err := restfultest.New(kvr, chain)
+ assert.NoError(t, err)
+ resp := httptest.NewRecorder()
+ start := time.Now()
+ c.ServeHTTP(resp, r)
+ duration := time.Since(start)
+ t.Log(duration)
+ assert.Equal(t, http.StatusOK, resp.Result().StatusCode)
+ })
+ t.Run("list kv by service label, with wait and current rev param,should wait and return 304 ", func(t *testing.T) {
+ r, _ := http.NewRequest("GET", "/v1/test/kie/kv?label=service:utService&wait=1s&"+common2.QueryParamRev+"="+rev, nil)
+ noopH := &handler2.NoopAuthHandler{}
+ chain, _ := handler.CreateChain(common.Provider, "testchain1", noopH.Name())
+ r.Header.Set("Content-Type", "application/json")
+ kvr := &v1.KVResource{}
+ c, err := restfultest.New(kvr, chain)
+ assert.NoError(t, err)
+ resp := httptest.NewRecorder()
+ start := time.Now()
+ c.ServeHTTP(resp, r)
+ duration := time.Since(start)
+ t.Log(duration)
+ assert.Equal(t, http.StatusNotModified, resp.Result().StatusCode)
+ })
+ t.Run("list kv by service label, with wait param,will too 1s and return 304", func(t *testing.T) {
+ r, _ := http.NewRequest("GET", "/v1/test/kie/kv?label=service:utService&wait=1s", nil)
+ noopH := &handler2.NoopAuthHandler{}
+ chain, _ := handler.CreateChain(common.Provider, "testchain1", noopH.Name())
+ r.Header.Set("Content-Type", "application/json")
+ kvr := &v1.KVResource{}
+ c, err := restfultest.New(kvr, chain)
+ assert.NoError(t, err)
+ resp := httptest.NewRecorder()
+ start := time.Now()
+ c.ServeHTTP(resp, r)
+ duration := time.Since(start)
+ t.Log(duration)
})
-
}
func TestKVResource_GetByKey(t *testing.T) {
t.Run("get one key by label, exact match,should return 1 kv", func(t *testing.T) {